Просмотр исходного кода

Ensure grouping agg behavior on only null fields (ESQL-1311)

This adds an explicit test that makes sure that grouping aggs (`STATS
FOO(x) BY y`) *mostly* return `null` if they receive only null values.
We have code for this scattered around the grouping aggs but we were
only testing it *sometimes*. This tests it all the time. Also! The
behavior wasn't *quite* consistent. `COUNT` and `COUNT(DISTINCT` style
aggs should return `0` but they didn't all do that. And non-`COUNT`
style aggs should return `null` and they all did that. Exception `SUM`
on doubles.
Nik Everett 2 лет назад
Родитель
Сommit
908f0c374a
56 измененных файлов с 1144 добавлено и 253 удалено
  1. 3 4
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
  2. 45 76
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java
  3. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanAggregatorFunction.java
  4. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunction.java
  5. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunction.java
  6. 37 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunction.java
  7. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunction.java
  8. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunction.java
  9. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunction.java
  10. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunction.java
  11. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunction.java
  12. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunction.java
  13. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleAggregatorFunction.java
  14. 45 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleGroupingAggregatorFunction.java
  15. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntAggregatorFunction.java
  16. 45 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntGroupingAggregatorFunction.java
  17. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongAggregatorFunction.java
  18. 45 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunction.java
  19. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleAggregatorFunction.java
  20. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleGroupingAggregatorFunction.java
  21. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntAggregatorFunction.java
  22. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntGroupingAggregatorFunction.java
  23. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongAggregatorFunction.java
  24. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunction.java
  25. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleAggregatorFunction.java
  26. 45 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleGroupingAggregatorFunction.java
  27. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntAggregatorFunction.java
  28. 45 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntGroupingAggregatorFunction.java
  29. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongAggregatorFunction.java
  30. 45 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunction.java
  31. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileDoubleAggregatorFunction.java
  32. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileDoubleGroupingAggregatorFunction.java
  33. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileIntAggregatorFunction.java
  34. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileIntGroupingAggregatorFunction.java
  35. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileLongAggregatorFunction.java
  36. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunction.java
  37. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunction.java
  38. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleGroupingAggregatorFunction.java
  39. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntAggregatorFunction.java
  40. 35 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntGroupingAggregatorFunction.java
  41. 3 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
  42. 45 4
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java
  43. 8 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java
  44. 8 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HllStates.java
  45. 44 16
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumDoubleAggregator.java
  46. 5 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumIntAggregator.java
  47. 30 13
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java
  48. 7 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java
  49. 8 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java
  50. 8 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java
  51. 8 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java
  52. 8 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java
  53. 7 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java
  54. 115 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java
  55. 5 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java
  56. 3 3
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

+ 3 - 4
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

@@ -42,7 +42,6 @@ import static org.elasticsearch.compute.gen.Types.BYTES_REF_BLOCK;
 import static org.elasticsearch.compute.gen.Types.BYTES_REF_VECTOR;
 import static org.elasticsearch.compute.gen.Types.DOUBLE_BLOCK;
 import static org.elasticsearch.compute.gen.Types.DOUBLE_VECTOR;
-import static org.elasticsearch.compute.gen.Types.ELEMENT_TYPE;
 import static org.elasticsearch.compute.gen.Types.INT_BLOCK;
 import static org.elasticsearch.compute.gen.Types.INT_VECTOR;
 import static org.elasticsearch.compute.gen.Types.LIST_INTEGER;
@@ -245,9 +244,9 @@ public class AggregatorImplementer {
     private MethodSpec addRawInput() {
         MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
         builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC).addParameter(PAGE, "page");
-        builder.addStatement("$T type = page.getBlock(channels.get(0)).elementType()", ELEMENT_TYPE);
-        builder.beginControlFlow("if (type == $T.NULL)", ELEMENT_TYPE).addStatement("return").endControlFlow();
-        builder.addStatement("$T block = page.getBlock(channels.get(0))", valueBlockType(init, combine));
+        builder.addStatement("$T uncastBlock = page.getBlock(channels.get(0))", BLOCK);
+        builder.beginControlFlow("if (uncastBlock.areAllValuesNull())").addStatement("return").endControlFlow();
+        builder.addStatement("$T block = ($T) uncastBlock", valueBlockType(init, combine), valueBlockType(init, combine));
         builder.addStatement("$T vector = block.asVector()", valueVectorType(init, combine));
         builder.beginControlFlow("if (vector != null)").addStatement("addRawVector(vector)");
         builder.nextControlFlow("else").addStatement("addRawBlock(block)").endControlFlow();

+ 45 - 76
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java

@@ -35,6 +35,7 @@ import static org.elasticsearch.compute.gen.Methods.findRequiredMethod;
 import static org.elasticsearch.compute.gen.Types.AGGREGATOR_STATE_VECTOR;
 import static org.elasticsearch.compute.gen.Types.AGGREGATOR_STATE_VECTOR_BUILDER;
 import static org.elasticsearch.compute.gen.Types.BIG_ARRAYS;
+import static org.elasticsearch.compute.gen.Types.BLOCK;
 import static org.elasticsearch.compute.gen.Types.BLOCK_ARRAY;
 import static org.elasticsearch.compute.gen.Types.BYTES_REF;
 import static org.elasticsearch.compute.gen.Types.GROUPING_AGGREGATOR_FUNCTION;
@@ -138,9 +139,11 @@ public class GroupingAggregatorImplementer {
         builder.addMethod(addRawInputStartup(LONG_VECTOR));
         builder.addMethod(addRawInputLoop(LONG_VECTOR, valueBlockType(init, combine)));
         builder.addMethod(addRawInputLoop(LONG_VECTOR, valueVectorType(init, combine)));
+        builder.addMethod(addRawInputLoop(LONG_VECTOR, BLOCK));
         builder.addMethod(addRawInputStartup(LONG_BLOCK));
         builder.addMethod(addRawInputLoop(LONG_BLOCK, valueBlockType(init, combine)));
         builder.addMethod(addRawInputLoop(LONG_BLOCK, valueVectorType(init, combine)));
+        builder.addMethod(addRawInputLoop(LONG_BLOCK, BLOCK));
         builder.addMethod(addIntermediateInput());
         builder.addMethod(addIntermediateRowInput());
         builder.addMethod(evaluateIntermediate());
@@ -197,8 +200,15 @@ public class GroupingAggregatorImplementer {
         MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
         builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
         builder.addParameter(groupsType, "groups").addParameter(PAGE, "page");
-        builder.addStatement("$T valuesBlock = page.getBlock(channels.get(0))", valueBlockType(init, combine));
         builder.addStatement("assert groups.getPositionCount() == page.getPositionCount()");
+        builder.addStatement("$T uncastValuesBlock = page.getBlock(channels.get(0))", BLOCK);
+        builder.beginControlFlow("if (uncastValuesBlock.areAllValuesNull())");
+        {
+            builder.addStatement("addRawInputAllNulls(groups, uncastValuesBlock)");
+            builder.addStatement("return");
+        }
+        builder.endControlFlow();
+        builder.addStatement("$T valuesBlock = ($T) uncastValuesBlock", valueBlockType(init, combine), valueBlockType(init, combine));
         builder.addStatement("$T valuesVector = valuesBlock.asVector()", valueVectorType(init, combine));
         builder.beginControlFlow("if (valuesVector == null)");
         builder.addStatement("addRawInput(groups, valuesBlock)");
@@ -210,8 +220,19 @@ public class GroupingAggregatorImplementer {
 
     private MethodSpec addRawInputLoop(TypeName groupsType, TypeName valuesType) {
         boolean groupsIsBlock = groupsType.toString().endsWith("Block");
-        boolean valuesIsBlock = valuesType.toString().endsWith("Block");
-        MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
+        enum ValueType {
+            VECTOR,
+            TYPED_BLOCK,
+            NULL_ONLY_BLOCK
+        }
+        ValueType valueType = valuesType.equals(BLOCK) ? ValueType.NULL_ONLY_BLOCK
+            : valuesType.toString().endsWith("Block") ? ValueType.TYPED_BLOCK
+            : ValueType.VECTOR;
+        String methodName = "addRawInput";
+        if (valueType == ValueType.NULL_ONLY_BLOCK) {
+            methodName += "AllNulls";
+        }
+        MethodSpec.Builder builder = MethodSpec.methodBuilder(methodName);
         builder.addModifiers(Modifier.PRIVATE);
         builder.addParameter(groupsType, "groups").addParameter(valuesType, "values");
         if (valuesIsBytesRef) {
@@ -232,18 +253,23 @@ public class GroupingAggregatorImplementer {
                 builder.addStatement("int groupId = Math.toIntExact(groups.getLong(position))");
             }
 
-            if (valuesIsBlock) {
-                builder.beginControlFlow("if (values.isNull(position))");
-                builder.addStatement("state.putNull(groupId)");
-                builder.addStatement("continue");
-                builder.endControlFlow();
-                builder.addStatement("int valuesStart = values.getFirstValueIndex(position)");
-                builder.addStatement("int valuesEnd = valuesStart + values.getValueCount(position)");
-                builder.beginControlFlow("for (int v = valuesStart; v < valuesEnd; v++)");
-                combineRawInput(builder, "values", "v");
-                builder.endControlFlow();
-            } else {
-                combineRawInput(builder, "values", "position");
+            switch (valueType) {
+                case VECTOR -> combineRawInput(builder, "values", "position");
+                case TYPED_BLOCK -> {
+                    builder.beginControlFlow("if (values.isNull(position))");
+                    builder.addStatement("state.putNull(groupId)");
+                    builder.addStatement("continue");
+                    builder.endControlFlow();
+                    builder.addStatement("int valuesStart = values.getFirstValueIndex(position)");
+                    builder.addStatement("int valuesEnd = valuesStart + values.getValueCount(position)");
+                    builder.beginControlFlow("for (int v = valuesStart; v < valuesEnd; v++)");
+                    combineRawInput(builder, "values", "v");
+                    builder.endControlFlow();
+                }
+                case NULL_ONLY_BLOCK -> {
+                    builder.addStatement("assert values.isNull(position)");
+                    builder.addStatement("state.putNull(groupId)");
+                }
             }
 
             if (groupsIsBlock) {
@@ -254,67 +280,6 @@ public class GroupingAggregatorImplementer {
         return builder.build();
     }
 
-    private MethodSpec addRawInputGroupVectorValuesVector() {
-        MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
-        builder.addModifiers(Modifier.PRIVATE);
-        builder.addParameter(LONG_VECTOR, "groups").addParameter(valueVectorType(init, combine), "values");
-        if (valuesIsBytesRef) {
-            // Add bytes_ref scratch var that will be used for bytes_ref blocks/vectors
-            builder.addStatement("$T scratch = new $T()", BYTES_REF, BYTES_REF);
-        }
-        builder.beginControlFlow("for (int position = 0; position < groups.getPositionCount(); position++)");
-        {
-            builder.addStatement("int groupId = Math.toIntExact(groups.getLong(position))");
-            combineRawInput(builder, "values", "position");
-        }
-        builder.endControlFlow();
-        return builder.build();
-    }
-
-    private MethodSpec addRawInputGroupBlockValuesBlock() {
-        MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
-        builder.addModifiers(Modifier.PRIVATE);
-        builder.addParameter(LONG_BLOCK, "groups").addParameter(valueBlockType(init, combine), "values");
-        if (valuesIsBytesRef) {
-            // Add bytes_ref scratch var that will be used for bytes_ref blocks/vectors
-            builder.addStatement("$T scratch = new $T()", BYTES_REF, BYTES_REF);
-        }
-        builder.beginControlFlow("for (int position = 0; position < groups.getPositionCount(); position++)");
-        {
-            builder.beginControlFlow("if (groups.isNull(position) || values.isNull(position)");
-            {
-                builder.addStatement("state.putNull(groupId)");
-                builder.addStatement("continue");
-            }
-            builder.endControlFlow();
-            builder.addStatement("int groupId = Math.toIntExact(groups.getLong(position))");
-            builder.addStatement("int start = values.getFirstValueIndex(position)");
-            builder.addStatement("int end = start + values.getValueCount(position)");
-            builder.beginControlFlow("for (int i = start; i < end; i++)");
-            combineRawInput(builder, "values", "i");
-            builder.endControlFlow();
-        }
-        builder.endControlFlow();
-        return builder.build();
-    }
-
-    private MethodSpec addRawInputGroupBlockValuesVector() {
-        MethodSpec.Builder builder = MethodSpec.methodBuilder("addRawInput");
-        builder.addModifiers(Modifier.PRIVATE);
-        builder.addParameter(LONG_VECTOR, "groups").addParameter(valueVectorType(init, combine), "values");
-        if (valuesIsBytesRef) {
-            // Add bytes_ref scratch var that will be used for bytes_ref blocks/vectors
-            builder.addStatement("$T scratch = new $T()", BYTES_REF, BYTES_REF);
-        }
-        builder.beginControlFlow("for (int position = 0; position < groups.getPositionCount(); position++)");
-        {
-            builder.addStatement("int groupId = Math.toIntExact(groups.getLong(position))");
-            combineRawInput(builder, "values", "position");
-        }
-        builder.endControlFlow();
-        return builder.build();
-    }
-
     private void combineRawInput(MethodSpec.Builder builder, String blockVariable, String offsetVariable) {
         if (valuesIsBytesRef) {
             combineRawInputForBytesRef(builder, blockVariable, offsetVariable);
@@ -402,7 +367,11 @@ public class GroupingAggregatorImplementer {
 
     private void combineStates(MethodSpec.Builder builder) {
         if (combineStates == null) {
+            builder.beginControlFlow("if (inState.hasValue(position))");
             builder.addStatement("state.set($T.combine(state.getOrDefault(groupId), inState.get(position)), groupId)", declarationType);
+            builder.nextControlFlow("else");
+            builder.addStatement("state.putNull(groupId)");
+            builder.endControlFlow();
             return;
         }
         builder.addStatement("$T.combineStates(state, groupId, inState, position)", declarationType);

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanAggregatorFunction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BooleanBlock;
 import org.elasticsearch.compute.data.BooleanVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -40,11 +39,11 @@ public final class CountDistinctBooleanAggregatorFunction implements AggregatorF
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    BooleanBlock block = page.getBlock(channels.get(0));
+    BooleanBlock block = (BooleanBlock) uncastBlock;
     BooleanVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunction.java

@@ -45,8 +45,13 @@ public final class CountDistinctBooleanGroupingAggregatorFunction implements Gro
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    BooleanBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    BooleanBlock valuesBlock = (BooleanBlock) uncastValuesBlock;
     BooleanVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -77,10 +82,23 @@ public final class CountDistinctBooleanGroupingAggregatorFunction implements Gro
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    BooleanBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    BooleanBlock valuesBlock = (BooleanBlock) uncastValuesBlock;
     BooleanVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -125,6 +143,21 @@ public final class CountDistinctBooleanGroupingAggregatorFunction implements Gro
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefAggregatorFunction.java

@@ -15,7 +15,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.compute.data.BytesRefVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -48,11 +47,11 @@ public final class CountDistinctBytesRefAggregatorFunction implements Aggregator
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    BytesRefBlock block = page.getBlock(channels.get(0));
+    BytesRefBlock block = (BytesRefBlock) uncastBlock;
     BytesRefVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 37 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunction.java

@@ -49,8 +49,13 @@ public final class CountDistinctBytesRefGroupingAggregatorFunction implements Gr
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    BytesRefBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    BytesRefBlock valuesBlock = (BytesRefBlock) uncastValuesBlock;
     BytesRefVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -83,10 +88,24 @@ public final class CountDistinctBytesRefGroupingAggregatorFunction implements Gr
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    BytesRef scratch = new BytesRef();
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    BytesRefBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    BytesRefBlock valuesBlock = (BytesRefBlock) uncastValuesBlock;
     BytesRefVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -133,6 +152,22 @@ public final class CountDistinctBytesRefGroupingAggregatorFunction implements Gr
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    BytesRef scratch = new BytesRef();
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleAggregatorFunction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -47,11 +46,11 @@ public final class CountDistinctDoubleAggregatorFunction implements AggregatorFu
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    DoubleBlock block = page.getBlock(channels.get(0));
+    DoubleBlock block = (DoubleBlock) uncastBlock;
     DoubleVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunction.java

@@ -48,8 +48,13 @@ public final class CountDistinctDoubleGroupingAggregatorFunction implements Grou
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -80,10 +85,23 @@ public final class CountDistinctDoubleGroupingAggregatorFunction implements Grou
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -128,6 +146,21 @@ public final class CountDistinctDoubleGroupingAggregatorFunction implements Grou
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
@@ -46,11 +45,11 @@ public final class CountDistinctIntAggregatorFunction implements AggregatorFunct
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    IntBlock block = page.getBlock(channels.get(0));
+    IntBlock block = (IntBlock) uncastBlock;
     IntVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunction.java

@@ -47,8 +47,13 @@ public final class CountDistinctIntGroupingAggregatorFunction implements Groupin
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -79,10 +84,23 @@ public final class CountDistinctIntGroupingAggregatorFunction implements Groupin
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -127,6 +145,21 @@ public final class CountDistinctIntGroupingAggregatorFunction implements Groupin
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
@@ -47,11 +46,11 @@ public final class CountDistinctLongAggregatorFunction implements AggregatorFunc
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    LongBlock block = page.getBlock(channels.get(0));
+    LongBlock block = (LongBlock) uncastBlock;
     LongVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunction.java

@@ -46,8 +46,13 @@ public final class CountDistinctLongGroupingAggregatorFunction implements Groupi
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -78,10 +83,23 @@ public final class CountDistinctLongGroupingAggregatorFunction implements Groupi
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -126,6 +144,21 @@ public final class CountDistinctLongGroupingAggregatorFunction implements Groupi
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleAggregatorFunction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -39,11 +38,11 @@ public final class MaxDoubleAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    DoubleBlock block = page.getBlock(channels.get(0));
+    DoubleBlock block = (DoubleBlock) uncastBlock;
     DoubleVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 45 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleGroupingAggregatorFunction.java

@@ -41,8 +41,13 @@ public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -73,10 +78,23 @@ public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggreg
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -121,6 +139,21 @@ public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggreg
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));
@@ -135,7 +168,11 @@ public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggreg
     blobVector.get(0, inState);
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      state.set(MaxDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (inState.hasValue(position)) {
+        state.set(MaxDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
     inState.close();
   }
@@ -146,7 +183,11 @@ public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggreg
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     DoubleArrayState inState = ((MaxDoubleGroupingAggregatorFunction) input).state;
-    state.set(MaxDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    if (inState.hasValue(position)) {
+      state.set(MaxDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
@@ -38,11 +37,11 @@ public final class MaxIntAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    IntBlock block = page.getBlock(channels.get(0));
+    IntBlock block = (IntBlock) uncastBlock;
     IntVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 45 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntGroupingAggregatorFunction.java

@@ -40,8 +40,13 @@ public final class MaxIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -72,10 +77,23 @@ public final class MaxIntGroupingAggregatorFunction implements GroupingAggregato
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -120,6 +138,21 @@ public final class MaxIntGroupingAggregatorFunction implements GroupingAggregato
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));
@@ -134,7 +167,11 @@ public final class MaxIntGroupingAggregatorFunction implements GroupingAggregato
     blobVector.get(0, inState);
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      state.set(MaxIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (inState.hasValue(position)) {
+        state.set(MaxIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
     inState.close();
   }
@@ -145,7 +182,11 @@ public final class MaxIntGroupingAggregatorFunction implements GroupingAggregato
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     IntArrayState inState = ((MaxIntGroupingAggregatorFunction) input).state;
-    state.set(MaxIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    if (inState.hasValue(position)) {
+      state.set(MaxIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
@@ -39,11 +38,11 @@ public final class MaxLongAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    LongBlock block = page.getBlock(channels.get(0));
+    LongBlock block = (LongBlock) uncastBlock;
     LongVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 45 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunction.java

@@ -39,8 +39,13 @@ public final class MaxLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -71,10 +76,23 @@ public final class MaxLongGroupingAggregatorFunction implements GroupingAggregat
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -119,6 +137,21 @@ public final class MaxLongGroupingAggregatorFunction implements GroupingAggregat
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));
@@ -133,7 +166,11 @@ public final class MaxLongGroupingAggregatorFunction implements GroupingAggregat
     blobVector.get(0, inState);
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      state.set(MaxLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (inState.hasValue(position)) {
+        state.set(MaxLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
     inState.close();
   }
@@ -144,7 +181,11 @@ public final class MaxLongGroupingAggregatorFunction implements GroupingAggregat
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     LongArrayState inState = ((MaxLongGroupingAggregatorFunction) input).state;
-    state.set(MaxLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    if (inState.hasValue(position)) {
+      state.set(MaxLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleAggregatorFunction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -40,11 +39,11 @@ public final class MedianAbsoluteDeviationDoubleAggregatorFunction implements Ag
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    DoubleBlock block = page.getBlock(channels.get(0));
+    DoubleBlock block = (DoubleBlock) uncastBlock;
     DoubleVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationDoubleGroupingAggregatorFunction.java

@@ -45,8 +45,13 @@ public final class MedianAbsoluteDeviationDoubleGroupingAggregatorFunction imple
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -77,10 +82,23 @@ public final class MedianAbsoluteDeviationDoubleGroupingAggregatorFunction imple
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -125,6 +143,21 @@ public final class MedianAbsoluteDeviationDoubleGroupingAggregatorFunction imple
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
@@ -39,11 +38,11 @@ public final class MedianAbsoluteDeviationIntAggregatorFunction implements Aggre
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    IntBlock block = page.getBlock(channels.get(0));
+    IntBlock block = (IntBlock) uncastBlock;
     IntVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationIntGroupingAggregatorFunction.java

@@ -44,8 +44,13 @@ public final class MedianAbsoluteDeviationIntGroupingAggregatorFunction implemen
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -76,10 +81,23 @@ public final class MedianAbsoluteDeviationIntGroupingAggregatorFunction implemen
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -124,6 +142,21 @@ public final class MedianAbsoluteDeviationIntGroupingAggregatorFunction implemen
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
@@ -40,11 +39,11 @@ public final class MedianAbsoluteDeviationLongAggregatorFunction implements Aggr
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    LongBlock block = page.getBlock(channels.get(0));
+    LongBlock block = (LongBlock) uncastBlock;
     LongVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MedianAbsoluteDeviationLongGroupingAggregatorFunction.java

@@ -43,8 +43,13 @@ public final class MedianAbsoluteDeviationLongGroupingAggregatorFunction impleme
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -75,10 +80,23 @@ public final class MedianAbsoluteDeviationLongGroupingAggregatorFunction impleme
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -123,6 +141,21 @@ public final class MedianAbsoluteDeviationLongGroupingAggregatorFunction impleme
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleAggregatorFunction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -39,11 +38,11 @@ public final class MinDoubleAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    DoubleBlock block = page.getBlock(channels.get(0));
+    DoubleBlock block = (DoubleBlock) uncastBlock;
     DoubleVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 45 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleGroupingAggregatorFunction.java

@@ -41,8 +41,13 @@ public final class MinDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -73,10 +78,23 @@ public final class MinDoubleGroupingAggregatorFunction implements GroupingAggreg
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -121,6 +139,21 @@ public final class MinDoubleGroupingAggregatorFunction implements GroupingAggreg
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));
@@ -135,7 +168,11 @@ public final class MinDoubleGroupingAggregatorFunction implements GroupingAggreg
     blobVector.get(0, inState);
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      state.set(MinDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (inState.hasValue(position)) {
+        state.set(MinDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
     inState.close();
   }
@@ -146,7 +183,11 @@ public final class MinDoubleGroupingAggregatorFunction implements GroupingAggreg
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     DoubleArrayState inState = ((MinDoubleGroupingAggregatorFunction) input).state;
-    state.set(MinDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    if (inState.hasValue(position)) {
+      state.set(MinDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
@@ -38,11 +37,11 @@ public final class MinIntAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    IntBlock block = page.getBlock(channels.get(0));
+    IntBlock block = (IntBlock) uncastBlock;
     IntVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 45 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntGroupingAggregatorFunction.java

@@ -40,8 +40,13 @@ public final class MinIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -72,10 +77,23 @@ public final class MinIntGroupingAggregatorFunction implements GroupingAggregato
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -120,6 +138,21 @@ public final class MinIntGroupingAggregatorFunction implements GroupingAggregato
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));
@@ -134,7 +167,11 @@ public final class MinIntGroupingAggregatorFunction implements GroupingAggregato
     blobVector.get(0, inState);
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      state.set(MinIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (inState.hasValue(position)) {
+        state.set(MinIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
     inState.close();
   }
@@ -145,7 +182,11 @@ public final class MinIntGroupingAggregatorFunction implements GroupingAggregato
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     IntArrayState inState = ((MinIntGroupingAggregatorFunction) input).state;
-    state.set(MinIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    if (inState.hasValue(position)) {
+      state.set(MinIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
@@ -39,11 +38,11 @@ public final class MinLongAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    LongBlock block = page.getBlock(channels.get(0));
+    LongBlock block = (LongBlock) uncastBlock;
     LongVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 45 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunction.java

@@ -39,8 +39,13 @@ public final class MinLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -71,10 +76,23 @@ public final class MinLongGroupingAggregatorFunction implements GroupingAggregat
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -119,6 +137,21 @@ public final class MinLongGroupingAggregatorFunction implements GroupingAggregat
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));
@@ -133,7 +166,11 @@ public final class MinLongGroupingAggregatorFunction implements GroupingAggregat
     blobVector.get(0, inState);
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      state.set(MinLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (inState.hasValue(position)) {
+        state.set(MinLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
     inState.close();
   }
@@ -144,7 +181,11 @@ public final class MinLongGroupingAggregatorFunction implements GroupingAggregat
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     LongArrayState inState = ((MinLongGroupingAggregatorFunction) input).state;
-    state.set(MinLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    if (inState.hasValue(position)) {
+      state.set(MinLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileDoubleAggregatorFunction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -44,11 +43,11 @@ public final class PercentileDoubleAggregatorFunction implements AggregatorFunct
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    DoubleBlock block = page.getBlock(channels.get(0));
+    DoubleBlock block = (DoubleBlock) uncastBlock;
     DoubleVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileDoubleGroupingAggregatorFunction.java

@@ -48,8 +48,13 @@ public final class PercentileDoubleGroupingAggregatorFunction implements Groupin
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -80,10 +85,23 @@ public final class PercentileDoubleGroupingAggregatorFunction implements Groupin
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -128,6 +146,21 @@ public final class PercentileDoubleGroupingAggregatorFunction implements Groupin
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileIntAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
@@ -42,11 +41,11 @@ public final class PercentileIntAggregatorFunction implements AggregatorFunction
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    IntBlock block = page.getBlock(channels.get(0));
+    IntBlock block = (IntBlock) uncastBlock;
     IntVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileIntGroupingAggregatorFunction.java

@@ -47,8 +47,13 @@ public final class PercentileIntGroupingAggregatorFunction implements GroupingAg
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -79,10 +84,23 @@ public final class PercentileIntGroupingAggregatorFunction implements GroupingAg
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -127,6 +145,21 @@ public final class PercentileIntGroupingAggregatorFunction implements GroupingAg
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileLongAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
@@ -43,11 +42,11 @@ public final class PercentileLongAggregatorFunction implements AggregatorFunctio
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    LongBlock block = page.getBlock(channels.get(0));
+    LongBlock block = (LongBlock) uncastBlock;
     LongVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/PercentileLongGroupingAggregatorFunction.java

@@ -46,8 +46,13 @@ public final class PercentileLongGroupingAggregatorFunction implements GroupingA
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -78,10 +83,23 @@ public final class PercentileLongGroupingAggregatorFunction implements GroupingA
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -126,6 +144,21 @@ public final class PercentileLongGroupingAggregatorFunction implements GroupingA
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunction.java

@@ -14,7 +14,6 @@ import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
@@ -39,11 +38,11 @@ public final class SumDoubleAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    DoubleBlock block = page.getBlock(channels.get(0));
+    DoubleBlock block = (DoubleBlock) uncastBlock;
     DoubleVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleGroupingAggregatorFunction.java

@@ -45,8 +45,13 @@ public final class SumDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -77,10 +82,23 @@ public final class SumDoubleGroupingAggregatorFunction implements GroupingAggreg
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    DoubleBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    DoubleBlock valuesBlock = (DoubleBlock) uncastValuesBlock;
     DoubleVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -125,6 +143,21 @@ public final class SumDoubleGroupingAggregatorFunction implements GroupingAggreg
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
@@ -39,11 +38,11 @@ public final class SumIntAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    IntBlock block = page.getBlock(channels.get(0));
+    IntBlock block = (IntBlock) uncastBlock;
     IntVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 35 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntGroupingAggregatorFunction.java

@@ -40,8 +40,13 @@ public final class SumIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -72,10 +77,23 @@ public final class SumIntGroupingAggregatorFunction implements GroupingAggregato
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    IntBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    IntBlock valuesBlock = (IntBlock) uncastValuesBlock;
     IntVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -120,6 +138,21 @@ public final class SumIntGroupingAggregatorFunction implements GroupingAggregato
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));

+ 3 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java

@@ -12,7 +12,6 @@ import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
@@ -39,11 +38,11 @@ public final class SumLongAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addRawInput(Page page) {
-    ElementType type = page.getBlock(channels.get(0)).elementType();
-    if (type == ElementType.NULL) {
+    Block uncastBlock = page.getBlock(channels.get(0));
+    if (uncastBlock.areAllValuesNull()) {
       return;
     }
-    LongBlock block = page.getBlock(channels.get(0));
+    LongBlock block = (LongBlock) uncastBlock;
     LongVector vector = block.asVector();
     if (vector != null) {
       addRawVector(vector);

+ 45 - 4
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java

@@ -39,8 +39,13 @@ public final class SumLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void addRawInput(LongVector groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -71,10 +76,23 @@ public final class SumLongGroupingAggregatorFunction implements GroupingAggregat
     }
   }
 
+  private void addRawInputAllNulls(LongVector groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groups.getLong(position));
+      assert values.isNull(position);
+      state.putNull(groupId);
+    }
+  }
+
   @Override
   public void addRawInput(LongBlock groups, Page page) {
-    LongBlock valuesBlock = page.getBlock(channels.get(0));
     assert groups.getPositionCount() == page.getPositionCount();
+    Block uncastValuesBlock = page.getBlock(channels.get(0));
+    if (uncastValuesBlock.areAllValuesNull()) {
+      addRawInputAllNulls(groups, uncastValuesBlock);
+      return;
+    }
+    LongBlock valuesBlock = (LongBlock) uncastValuesBlock;
     LongVector valuesVector = valuesBlock.asVector();
     if (valuesVector == null) {
       addRawInput(groups, valuesBlock);
@@ -119,6 +137,21 @@ public final class SumLongGroupingAggregatorFunction implements GroupingAggregat
     }
   }
 
+  private void addRawInputAllNulls(LongBlock groups, Block values) {
+    for (int position = 0; position < groups.getPositionCount(); position++) {
+      if (groups.isNull(position)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(position);
+      int groupEnd = groupStart + groups.getValueCount(position);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = Math.toIntExact(groups.getLong(g));
+        assert values.isNull(position);
+        state.putNull(groupId);
+      }
+    }
+  }
+
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
     Block block = page.getBlock(channels.get(0));
@@ -133,7 +166,11 @@ public final class SumLongGroupingAggregatorFunction implements GroupingAggregat
     blobVector.get(0, inState);
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      state.set(SumLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (inState.hasValue(position)) {
+        state.set(SumLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
     inState.close();
   }
@@ -144,7 +181,11 @@ public final class SumLongGroupingAggregatorFunction implements GroupingAggregat
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     LongArrayState inState = ((SumLongGroupingAggregatorFunction) input).state;
-    state.set(SumLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    if (inState.hasValue(position)) {
+      state.set(SumLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override

+ 8 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java

@@ -89,6 +89,9 @@ public class CountGroupingAggregatorFunction implements GroupingAggregatorFuncti
     }
 
     private void addRawInput(LongBlock groups, Block values) {
+        if (values.areAllValuesNull()) {
+            return;
+        }
         for (int position = 0; position < groups.getPositionCount(); position++) {
             if (groups.isNull(position)) {
                 continue;
@@ -145,7 +148,11 @@ public class CountGroupingAggregatorFunction implements GroupingAggregatorFuncti
 
     @Override
     public void evaluateFinal(Block[] blocks, int offset, IntVector selected) {
-        blocks[offset] = state.toValuesBlock(selected);
+        LongVector.Builder builder = LongVector.newVectorBuilder(selected.getPositionCount());
+        for (int i = 0; i < selected.getPositionCount(); i++) {
+            builder.appendLong(state.get(selected.getInt(i)));
+        }
+        blocks[offset] = builder.build().asBlock();
     }
 
     @Override

+ 8 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/HllStates.java

@@ -160,6 +160,12 @@ final class HllStates {
 
         final HyperLogLogPlusPlus hll;
 
+        /**
+         * Maximum group id received. Only needed for estimating max serialization size.
+         * We won't need to do that one day and can remove this.
+         */
+        int maxGroupId;
+
         GroupingState(BigArrays bigArrays, int precision) {
             this.serializer = new GroupingStateSerializer();
             this.hll = new HyperLogLogPlusPlus(HyperLogLogPlusPlus.precisionFromThreshold(precision), bigArrays, 1);
@@ -191,7 +197,7 @@ final class HllStates {
         }
 
         void putNull(int groupId) {
-            // no-op
+            maxGroupId = Math.max(maxGroupId, groupId);
         }
 
         void merge(int groupId, AbstractHyperLogLogPlusPlus other, int otherGroup) {
@@ -201,7 +207,7 @@ final class HllStates {
         @Override
         public long getEstimatedSize() {
             int len = Integer.BYTES; // Serialize number of groups
-            for (int groupId = 0; groupId < hll.maxOrd(); groupId++) {
+            for (int groupId = 0; groupId <= Math.max(hll.maxOrd(), maxGroupId + 1); groupId++) {
                 len += Integer.BYTES; // Serialize length of hll byte array
                 // Serialize hll byte array. Unfortunately, the hll data structure
                 // is not fixed length, so we must serialize it and then get its length

+ 44 - 16
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumDoubleAggregator.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.compute.aggregation;
 
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BitArray;
 import org.elasticsearch.common.util.DoubleArray;
 import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
@@ -51,14 +52,22 @@ class SumDoubleAggregator {
         current.add(v, groupId);
     }
 
-    public static void combineStates(GroupingSumState current, int currentGroupId, GroupingSumState state, int statePosition) {
-        current.add(state.values.get(statePosition), state.deltas.get(statePosition), currentGroupId);
+    public static void combineStates(GroupingSumState current, int groupId, GroupingSumState state, int statePosition) {
+        if (state.hasValue(statePosition)) {
+            current.add(state.values.get(statePosition), state.deltas.get(statePosition), groupId);
+        } else {
+            current.putNull(groupId);
+        }
     }
 
     public static Block evaluateFinal(GroupingSumState state, IntVector selected) {
         DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(selected.getPositionCount());
         for (int i = 0; i < selected.getPositionCount(); i++) {
-            builder.appendDouble(state.values.get(selected.getInt(i)));
+            if (state.hasValue(i)) {
+                builder.appendDouble(state.values.get(selected.getInt(i)));
+            } else {
+                builder.appendNull();
+            }
         }
         return builder.build();
     }
@@ -143,6 +152,7 @@ class SumDoubleAggregator {
         int largestGroupId;
 
         private final GroupingSumStateSerializer serializer;
+        private BitArray nonNulls;
 
         GroupingSumState(BigArrays bigArrays) {
             this.bigArrays = bigArrays;
@@ -163,31 +173,48 @@ class SumDoubleAggregator {
             add(valueToAdd, 0d, groupId);
         }
 
-        void add(double valueToAdd, double deltaToAdd, int position) {
-            ensureCapacity(position);
+        void add(double valueToAdd, double deltaToAdd, int groupId) {
+            ensureCapacity(groupId);
 
             // If the value is Inf or NaN, just add it to the running tally to "convert" to
             // Inf/NaN. This keeps the behavior bwc from before kahan summing
             if (Double.isFinite(valueToAdd) == false) {
-                values.increment(position, valueToAdd);
+                values.increment(groupId, valueToAdd);
                 return;
             }
 
-            double value = values.get(position);
+            double value = values.get(groupId);
             if (Double.isFinite(value) == false) {
                 // It isn't going to get any more infinite.
                 return;
             }
-            double delta = deltas.get(position);
+            double delta = deltas.get(groupId);
             double correctedSum = valueToAdd + (delta + deltaToAdd);
             double updatedValue = value + correctedSum;
-            deltas.set(position, correctedSum - (updatedValue - value));
-            values.set(position, updatedValue);
+            deltas.set(groupId, correctedSum - (updatedValue - value));
+            values.set(groupId, updatedValue);
+            if (nonNulls != null) {
+                nonNulls.set(groupId);
+            }
+        }
+
+        void putNull(int groupId) {
+            if (groupId > largestGroupId) {
+                ensureCapacity(groupId);
+                largestGroupId = groupId;
+            }
+            if (nonNulls == null) {
+                nonNulls = new BitArray(groupId + 1, bigArrays);
+                for (int i = 0; i < groupId; i++) {
+                    nonNulls.set(i);
+                }
+            } else {
+                nonNulls.ensureCapacity(groupId + 1);
+            }
         }
 
-        void putNull(int position) {
-            // counts = 0 is for nulls
-            ensureCapacity(position);
+        boolean hasValue(int index) {
+            return nonNulls == null || nonNulls.get(index);
         }
 
         private void ensureCapacity(int groupId) {
@@ -200,7 +227,7 @@ class SumDoubleAggregator {
 
         @Override
         public long getEstimatedSize() {
-            return Long.BYTES + (largestGroupId + 1) * BYTES_SIZE;
+            return Long.BYTES + (largestGroupId + 1) * BYTES_SIZE + LongArrayState.estimateSerializeSize(nonNulls);
         }
 
         @Override
@@ -210,7 +237,7 @@ class SumDoubleAggregator {
 
         @Override
         public void close() {
-            Releasables.close(values, deltas);
+            Releasables.close(values, deltas, nonNulls);
         }
     }
 
@@ -237,7 +264,7 @@ class SumDoubleAggregator {
                 doubleHandle.set(ba, offset + 8, state.deltas.get(group));
                 offset += BYTES_SIZE;
             }
-            return 8 + (BYTES_SIZE * selected.getPositionCount()); // number of bytes written
+            return 8 + (BYTES_SIZE * selected.getPositionCount()) + LongArrayState.serializeBitArray(state.nonNulls, ba, offset);
         }
 
         // sets the state in value
@@ -255,6 +282,7 @@ class SumDoubleAggregator {
                 offset += BYTES_SIZE;
             }
             state.largestGroupId = positions - 1;
+            state.nonNulls = LongArrayState.deseralizeBitArray(state.bigArrays, ba, offset);
         }
     }
 }

+ 5 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumIntAggregator.java

@@ -26,6 +26,10 @@ class SumIntAggregator {
     }
 
     public static void combineStates(LongArrayState current, int groupId, LongArrayState state, int position) {
-        current.set(Math.addExact(current.getOrDefault(groupId), state.get(position)), groupId);
+        if (state.hasValue(position)) {
+            current.set(Math.addExact(current.getOrDefault(groupId), state.get(position)), groupId);
+        } else {
+            current.putNull(groupId);
+        }
     }
 }

+ 30 - 13
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java

@@ -22,7 +22,6 @@ import org.elasticsearch.compute.operator.AggregationOperator;
 import org.elasticsearch.compute.operator.CannedSourceOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.compute.operator.EmptySourceOperator;
 import org.elasticsearch.compute.operator.ForkingOperatorTestCase;
 import org.elasticsearch.compute.operator.NullInsertingSourceOperator;
 import org.elasticsearch.compute.operator.Operator;
@@ -118,20 +117,38 @@ public abstract class AggregatorFunctionTestCase extends ForkingOperatorTestCase
     }
 
     public final void testEmptyInput() {
-        List<Page> results = new ArrayList<>();
         DriverContext driverContext = new DriverContext();
+        List<Page> results = drive(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext), List.<Page>of().iterator());
+
+        assertThat(results, hasSize(1));
+        assertOutputFromEmpty(results.get(0).getBlock(0));
+    }
+
+    public final void testEmptyInputInitialFinal() {
+        DriverContext driverContext = new DriverContext();
+        List<Page> results = drive(
+            List.of(
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.FINAL).get(driverContext)
+            ),
+            List.<Page>of().iterator()
+        );
+
+        assertThat(results, hasSize(1));
+        assertOutputFromEmpty(results.get(0).getBlock(0));
+    }
+
+    public final void testEmptyInputInitialIntermediateFinal() {
+        DriverContext driverContext = new DriverContext();
+        List<Page> results = drive(
+            List.of(
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INTERMEDIATE).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.FINAL).get(driverContext)
+            ),
+            List.<Page>of().iterator()
+        );
 
-        try (
-            Driver d = new Driver(
-                driverContext,
-                new EmptySourceOperator(),
-                List.of(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext)),
-                new PageConsumerOperator(page -> results.add(page)),
-                () -> {}
-            )
-        ) {
-            d.run();
-        }
         assertThat(results, hasSize(1));
         assertOutputFromEmpty(results.get(0).getBlock(0));
     }

+ 7 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java

@@ -45,4 +45,11 @@ public class CountDistinctBooleanGroupingAggregatorFunctionTests extends Groupin
         long count = ((LongBlock) result).getLong(position);
         assertThat(count, equalTo(distinct));
     }
+
+    @Override
+    protected void assertOutputFromNullOnly(Block b, int position) {
+        assertThat(b.isNull(position), equalTo(false));
+        assertThat(b.getValueCount(position), equalTo(1));
+        assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L));
+    }
 }

+ 8 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java

@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.stream.LongStream;
 
 import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
 
 public class CountDistinctBytesRefGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
 
@@ -50,4 +51,11 @@ public class CountDistinctBytesRefGroupingAggregatorFunctionTests extends Groupi
         // For a number of values close to 10k and precision_threshold=1000, precision should be less than 10%
         assertThat((double) count, closeTo(distinct, distinct * 0.1));
     }
+
+    @Override
+    protected void assertOutputFromNullOnly(Block b, int position) {
+        assertThat(b.isNull(position), equalTo(false));
+        assertThat(b.getValueCount(position), equalTo(1));
+        assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L));
+    }
 }

+ 8 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java

@@ -19,6 +19,7 @@ import java.util.List;
 import java.util.stream.LongStream;
 
 import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
 
 public class CountDistinctDoubleGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
 
@@ -48,4 +49,11 @@ public class CountDistinctDoubleGroupingAggregatorFunctionTests extends Grouping
         // For a number of values close to 10k and precision_threshold=1000, precision should be less than 10%
         assertThat((double) count, closeTo(distinct, distinct * 0.1));
     }
+
+    @Override
+    protected void assertOutputFromNullOnly(Block b, int position) {
+        assertThat(b.isNull(position), equalTo(false));
+        assertThat(b.getValueCount(position), equalTo(1));
+        assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L));
+    }
 }

+ 8 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java

@@ -19,6 +19,7 @@ import java.util.List;
 import java.util.stream.LongStream;
 
 import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
 
 public class CountDistinctIntGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
 
@@ -49,4 +50,11 @@ public class CountDistinctIntGroupingAggregatorFunctionTests extends GroupingAgg
         // For a number of values close to 10k and precision_threshold=1000, precision should be less than 10%
         assertThat((double) count, closeTo(distinct, distinct * 0.1));
     }
+
+    @Override
+    protected void assertOutputFromNullOnly(Block b, int position) {
+        assertThat(b.isNull(position), equalTo(false));
+        assertThat(b.getValueCount(position), equalTo(1));
+        assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L));
+    }
 }

+ 8 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java

@@ -19,6 +19,7 @@ import java.util.List;
 import java.util.stream.LongStream;
 
 import static org.hamcrest.Matchers.closeTo;
+import static org.hamcrest.Matchers.equalTo;
 
 public class CountDistinctLongGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
     @Override
@@ -45,4 +46,11 @@ public class CountDistinctLongGroupingAggregatorFunctionTests extends GroupingAg
         // For a number of values close to 10k and precision_threshold=1000, precision should be less than 10%
         assertThat((double) count, closeTo(expected, expected * 0.1));
     }
+
+    @Override
+    protected void assertOutputFromNullOnly(Block b, int position) {
+        assertThat(b.isNull(position), equalTo(false));
+        assertThat(b.getValueCount(position), equalTo(1));
+        assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L));
+    }
 }

+ 7 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java

@@ -49,4 +49,11 @@ public class CountGroupingAggregatorFunctionTests extends GroupingAggregatorFunc
         long count = input.stream().flatMapToInt(p -> allValueOffsets(p, group)).count();
         assertThat(((LongBlock) result).getLong(position), equalTo(count));
     }
+
+    @Override
+    protected void assertOutputFromNullOnly(Block b, int position) {
+        assertThat(b.isNull(position), equalTo(false));
+        assertThat(b.getValueCount(position), equalTo(1));
+        assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L));
+    }
 }

+ 115 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java

@@ -17,6 +17,7 @@ import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.CannedSourceOperator;
 import org.elasticsearch.compute.operator.DriverContext;
@@ -27,6 +28,7 @@ import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.PositionMergingSourceOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -69,8 +71,7 @@ public abstract class GroupingAggregatorFunctionTestCase extends ForkingOperator
             + "[channels=[1]], mode=SINGLE]]]";
     }
 
-    @Override
-    protected final void assertSimpleOutput(List<Page> input, List<Page> results) {
+    private SortedSet<Long> seenGroups(List<Page> input) {
         SortedSet<Long> seenGroups = new TreeSet<>();
         for (Page in : input) {
             LongBlock groups = in.getBlock(0);
@@ -85,6 +86,12 @@ public abstract class GroupingAggregatorFunctionTestCase extends ForkingOperator
                 }
             }
         }
+        return seenGroups;
+    }
+
+    @Override
+    protected final void assertSimpleOutput(List<Page> input, List<Page> results) {
+        SortedSet<Long> seenGroups = seenGroups(input);
 
         assertThat(results, hasSize(1));
         assertThat(results.get(0).getBlockCount(), equalTo(2));
@@ -185,6 +192,112 @@ public abstract class GroupingAggregatorFunctionTestCase extends ForkingOperator
         assertSimpleOutput(input, results);
     }
 
+    public final void testNullOnly() {
+        DriverContext driverContext = new DriverContext();
+        assertNullOnly(List.of(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext)));
+    }
+
+    public final void testNullOnlyInputInitialFinal() {
+        DriverContext driverContext = new DriverContext();
+        assertNullOnly(
+            List.of(
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.FINAL).get(driverContext)
+            )
+        );
+    }
+
+    public final void testNullOnlyInputInitialIntermediateFinal() {
+        DriverContext driverContext = new DriverContext();
+        assertNullOnly(
+            List.of(
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INTERMEDIATE).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.FINAL).get(driverContext)
+            )
+        );
+    }
+
+    private void assertNullOnly(List<Operator> operators) {
+        List<Page> source = List.of(new Page(LongVector.newVectorBuilder(1).appendLong(0).build().asBlock(), Block.constantNullBlock(1)));
+        List<Page> results = drive(operators, source.iterator());
+
+        assertThat(results, hasSize(1));
+        Block resultBlock = results.get(0).getBlock(1);
+        assertOutputFromNullOnly(resultBlock, 0);
+    }
+
+    public final void testNullSome() {
+        DriverContext driverContext = new DriverContext();
+        assertNullSome(List.of(simple(nonBreakingBigArrays().withCircuitBreaking()).get(driverContext)));
+    }
+
+    public final void testNullSomeInitialFinal() {
+        DriverContext driverContext = new DriverContext();
+        assertNullSome(
+            List.of(
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.FINAL).get(driverContext)
+            )
+        );
+    }
+
+    public final void testNullSomeInitialIntermediateFinal() {
+        DriverContext driverContext = new DriverContext();
+        assertNullSome(
+            List.of(
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INITIAL).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.INTERMEDIATE).get(driverContext),
+                simpleWithMode(nonBreakingBigArrays().withCircuitBreaking(), AggregatorMode.FINAL).get(driverContext)
+            )
+        );
+    }
+
+    private void assertNullSome(List<Operator> operators) {
+        List<Page> inputData = CannedSourceOperator.collectPages(simpleInput(1000));
+        SortedSet<Long> seenGroups = seenGroups(inputData);
+
+        long nullGroup = randomFrom(seenGroups);
+        List<Page> source = new ArrayList<>(inputData.size());
+        for (Page page : inputData) {
+            LongVector groups = page.<LongBlock>getBlock(0).asVector();
+            Block values = page.getBlock(1);
+            Block.Builder copiedValues = values.elementType().newBlockBuilder(page.getPositionCount());
+            for (int p = 0; p < page.getPositionCount(); p++) {
+                if (groups.getLong(p) == nullGroup) {
+                    copiedValues.appendNull();
+                } else {
+                    copiedValues.copyFrom(values, p, p + 1);
+                }
+            }
+            source.add(new Page(groups.asBlock(), copiedValues.build()));
+        }
+
+        List<Page> results = drive(operators, source.iterator());
+
+        assertThat(results, hasSize(1));
+        LongVector groups = results.get(0).<LongBlock>getBlock(0).asVector();
+        Block resultBlock = results.get(0).getBlock(1);
+        boolean foundNullPosition = false;
+        for (int p = 0; p < groups.getPositionCount(); p++) {
+            if (groups.getLong(p) == nullGroup) {
+                foundNullPosition = true;
+                assertOutputFromNullOnly(resultBlock, p);
+            }
+        }
+        assertTrue("didn't find the null position. bad position range?", foundNullPosition);
+    }
+
+    /**
+     * Asserts that the output from a group that contains only null values is
+     * a {@link Block} containing only {@code null}. Override for
+     * {@code count} style aggregations that return other sorts of results.
+     */
+    protected void assertOutputFromNullOnly(Block b, int position) {
+        assertThat(b.isNull(position), equalTo(true));
+        assertThat(b.getValueCount(position), equalTo(0));
+    }
+
     private SourceOperator mergeValues(SourceOperator orig) {
         return new PositionMergingSourceOperator(orig) {
             @Override

+ 5 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java

@@ -195,12 +195,16 @@ public abstract class OperatorTestCase extends ESTestCase {
     }
 
     protected final List<Page> drive(Operator operator, Iterator<Page> input) {
+        return drive(List.of(operator), input);
+    }
+
+    protected final List<Page> drive(List<Operator> operators, Iterator<Page> input) {
         List<Page> results = new ArrayList<>();
         try (
             Driver d = new Driver(
                 new DriverContext(),
                 new CannedSourceOperator(input),
-                List.of(operator),
+                operators,
                 new PageConsumerOperator(page -> results.add(page)),
                 () -> {}
             )

+ 3 - 3
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java

@@ -176,7 +176,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         assertEquals(expectedValues, actualValues);
     }
 
-    @AwaitsFix(bugUrl = "1306")
+    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-internal/issues/1306")
     public void testFromGroupingByNumericFieldWithNulls() {
         for (int i = 0; i < 5; i++) {
             client().prepareBulk()
@@ -213,7 +213,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
             .sorted(comparing(c -> c.data))
             .toList();
         assertEquals(expectedGroups, actualGroups);
-        for (int i = 0; i < 5; i++) {
+        for (int i = 0; i < 5; i++) { /// TODO indices are automatically cleaned up. why delete?
             client().prepareBulk()
                 .add(new DeleteRequest("test").id("no_color_" + i))
                 .add(new DeleteRequest("test").id("no_count_red_" + i))
@@ -246,7 +246,7 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
         assertThat(actualGroups, equalTo(expectedGroups));
     }
 
-    @AwaitsFix(bugUrl = "1306")
+    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch-internal/issues/1306")
     public void testFromStatsGroupingByKeywordWithNulls() {
         for (int i = 0; i < 5; i++) {
             client().prepareBulk()