Browse Source

Replace aggstate in aggs whose internal state is one of the primitives (ESQL-1375)

This commit moves aggs whose internal state is one of the primitives,
over to the new intermediate agg state mechanism.

We can then remove quite a bit of the internal serialization logic.
Chris Hegarty 2 years ago
parent
commit
3aeb7d5faf
41 changed files with 481 additions and 802 deletions
  1. 36 7
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java
  2. 45 9
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java
  3. 15 0
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Methods.java
  4. 18 42
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleArrayState.java
  5. 12 33
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleState.java
  6. 17 42
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntArrayState.java
  7. 12 33
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntState.java
  8. 18 79
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongArrayState.java
  9. 12 33
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongState.java
  10. 14 23
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleAggregatorFunction.java
  11. 12 20
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleGroupingAggregatorFunction.java
  12. 14 22
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntAggregatorFunction.java
  13. 12 20
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntGroupingAggregatorFunction.java
  14. 14 23
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongAggregatorFunction.java
  15. 12 20
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunction.java
  16. 14 23
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleAggregatorFunction.java
  17. 12 20
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleGroupingAggregatorFunction.java
  18. 14 22
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntAggregatorFunction.java
  19. 12 20
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntGroupingAggregatorFunction.java
  20. 5 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongAggregatorFunction.java
  21. 9 2
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunction.java
  22. 1 1
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunction.java
  23. 1 1
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleGroupingAggregatorFunction.java
  24. 15 22
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntAggregatorFunction.java
  25. 20 20
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntGroupingAggregatorFunction.java
  26. 14 23
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java
  27. 12 20
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java
  28. 18 22
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountAggregatorFunction.java
  29. 17 21
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java
  30. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxDoubleAggregator.java
  31. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxIntAggregator.java
  32. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxLongAggregator.java
  33. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MinDoubleAggregator.java
  34. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MinIntAggregator.java
  35. 0 44
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MinLongAggregator.java
  36. 8 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumDoubleAggregator.java
  37. 3 11
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumIntAggregator.java
  38. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java
  39. 23 81
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ArrayState.java.st
  40. 12 33
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-State.java.st
  41. 2 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

+ 36 - 7
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/AggregatorImplementer.java

@@ -32,6 +32,7 @@ import javax.lang.model.util.Elements;
 import static java.util.stream.Collectors.joining;
 import static org.elasticsearch.compute.gen.Methods.findMethod;
 import static org.elasticsearch.compute.gen.Methods.findRequiredMethod;
+import static org.elasticsearch.compute.gen.Methods.vectorAccessorName;
 import static org.elasticsearch.compute.gen.Types.AGGREGATOR_FUNCTION;
 import static org.elasticsearch.compute.gen.Types.AGGREGATOR_STATE_VECTOR;
 import static org.elasticsearch.compute.gen.Types.AGGREGATOR_STATE_VECTOR_BUILDER;
@@ -398,7 +399,7 @@ public class AggregatorImplementer {
     private MethodSpec addIntermediateInput() {
         MethodSpec.Builder builder = MethodSpec.methodBuilder("addIntermediateInput");
         builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC).addParameter(PAGE, "page");
-        if (combineIntermediate != null) {
+        if (isAggState() == false) {
             builder.addStatement("assert channels.size() == intermediateBlockCount()");
             builder.addStatement("assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size()");
             int count = 0;
@@ -420,10 +421,25 @@ public class AggregatorImplementer {
                         .map(s -> first + ".getPositionCount() == " + s + ".getPositionCount()")
                         .collect(joining(" && "))
             );
-            builder.addStatement(
-                "$T.combineIntermediate(state, " + intermediateState.stream().map(IntermediateStateDesc::name).collect(joining(", ")) + ")",
-                declarationType
-            );
+            if (hasPrimitiveState()) {
+                assert intermediateState.size() == 2;
+                assert intermediateState.get(1).name().equals("seen");
+                builder.beginControlFlow("if (seen.getBoolean(0))");
+                {
+                    var state = intermediateState.get(0);
+                    var s = "state.$L($T.combine(state.$L(), " + state.name() + "." + vectorAccessorName(state.elementType()) + "(0)))";
+                    builder.addStatement(s, primitiveStateMethod(), declarationType, primitiveStateMethod());
+                    builder.addStatement("state.seen(true)");
+                    builder.endControlFlow();
+                }
+            } else {
+                builder.addStatement(
+                    "$T.combineIntermediate(state, "
+                        + intermediateState.stream().map(IntermediateStateDesc::name).collect(joining(", "))
+                        + ")",
+                    declarationType
+                );
+            }
         } else {
             builder.addStatement("Block block = page.getBlock(channels.get(0))");
             builder.addStatement("$T vector = block.asVector()", VECTOR);
@@ -480,8 +496,9 @@ public class AggregatorImplementer {
             .addModifiers(Modifier.PUBLIC)
             .addParameter(BLOCK_ARRAY, "blocks")
             .addParameter(TypeName.INT, "offset");
-        if (combineIntermediate != null) {
-            builder.addStatement("$T.evaluateIntermediate(state, blocks, offset)", declarationType);
+        if (isAggState() == false) {
+            assert hasPrimitiveState();
+            builder.addStatement("state.toIntermediate(blocks, offset)");
         } else {
             ParameterizedTypeName stateBlockBuilderType = ParameterizedTypeName.get(
                 AGGREGATOR_STATE_VECTOR_BUILDER,
@@ -557,4 +574,16 @@ public class AggregatorImplementer {
     private ParameterizedTypeName stateBlockType() {
         return ParameterizedTypeName.get(AGGREGATOR_STATE_VECTOR, stateType);
     }
+
+    private boolean isAggState() {
+        return intermediateState.get(0).name().equals("aggstate");
+    }
+
+    private boolean hasPrimitiveState() {
+        return switch (stateType.toString()) {
+            case "org.elasticsearch.compute.aggregation.IntState", "org.elasticsearch.compute.aggregation.LongState",
+                "org.elasticsearch.compute.aggregation.DoubleState" -> true;
+            default -> false;
+        };
+    }
 }

+ 45 - 9
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/GroupingAggregatorImplementer.java

@@ -36,6 +36,7 @@ import static org.elasticsearch.compute.gen.AggregatorImplementer.valueBlockType
 import static org.elasticsearch.compute.gen.AggregatorImplementer.valueVectorType;
 import static org.elasticsearch.compute.gen.Methods.findMethod;
 import static org.elasticsearch.compute.gen.Methods.findRequiredMethod;
+import static org.elasticsearch.compute.gen.Methods.vectorAccessorName;
 import static org.elasticsearch.compute.gen.Types.AGGREGATOR_STATE_VECTOR;
 import static org.elasticsearch.compute.gen.Types.AGGREGATOR_STATE_VECTOR_BUILDER;
 import static org.elasticsearch.compute.gen.Types.BIG_ARRAYS;
@@ -394,7 +395,7 @@ public class GroupingAggregatorImplementer {
         builder.addAnnotation(Override.class).addModifiers(Modifier.PUBLIC);
         builder.addParameter(LONG_VECTOR, "groupIdVector").addParameter(PAGE, "page");
 
-        if (combineIntermediate != null) {
+        if (isAggState() == false) {
             builder.addStatement("assert channels.size() == intermediateBlockCount()");
             builder.addStatement("assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size()");
             int count = 0;
@@ -415,12 +416,34 @@ public class GroupingAggregatorImplementer {
                         .map(s -> first + ".getPositionCount() == " + s + ".getPositionCount()")
                         .collect(joining(" && "))
             );
-            builder.addStatement(
-                "$T.combineIntermediate(groupIdVector, state, "
-                    + intermediateState.stream().map(IntermediateStateDesc::name).collect(joining(", "))
-                    + ")",
-                declarationType
-            );
+            if (hasPrimitiveState()) {
+                assert intermediateState.size() == 2;
+                assert intermediateState.get(1).name().equals("seen");
+                builder.beginControlFlow("for (int position = 0; position < groupIdVector.getPositionCount(); position++)");
+                {
+                    builder.addStatement("int groupId = Math.toIntExact(groupIdVector.getLong(position))");
+                    builder.beginControlFlow("if (seen.getBoolean(position))");
+                    {
+                        var name = intermediateState.get(0).name();
+                        var m = vectorAccessorName(intermediateState.get(0).elementType());
+                        builder.addStatement(
+                            "state.set($T.combine(state.getOrDefault(groupId), " + name + "." + m + "(position)), groupId)",
+                            declarationType
+                        );
+                        builder.nextControlFlow("else");
+                        builder.addStatement("state.putNull(groupId)");
+                        builder.endControlFlow();
+                    }
+                    builder.endControlFlow();
+                }
+            } else {
+                builder.addStatement(
+                    "$T.combineIntermediate(groupIdVector, state, "
+                        + intermediateState.stream().map(IntermediateStateDesc::name).collect(joining(", "))
+                        + ")",
+                    declarationType
+                );
+            }
         } else {
             builder.addStatement("Block block = page.getBlock(channels.get(0))");
             builder.addStatement("$T vector = block.asVector()", VECTOR);
@@ -478,8 +501,9 @@ public class GroupingAggregatorImplementer {
             .addParameter(BLOCK_ARRAY, "blocks")
             .addParameter(TypeName.INT, "offset")
             .addParameter(INT_VECTOR, "selected");
-        if (combineIntermediate != null) {
-            builder.addStatement("$T.evaluateIntermediate(state, blocks, offset, selected)", declarationType);
+        if (isAggState() == false) {
+            assert hasPrimitiveState();
+            builder.addStatement("state.toIntermediate(blocks, offset, selected)");
         } else {
             ParameterizedTypeName stateBlockBuilderType = ParameterizedTypeName.get(
                 AGGREGATOR_STATE_VECTOR_BUILDER,
@@ -534,4 +558,16 @@ public class GroupingAggregatorImplementer {
     private ParameterizedTypeName stateBlockType() {
         return ParameterizedTypeName.get(AGGREGATOR_STATE_VECTOR, stateType);
     }
+
+    private boolean isAggState() {
+        return intermediateState.get(0).name().equals("aggstate");
+    }
+
+    private boolean hasPrimitiveState() {
+        return switch (stateType.toString()) {
+            case "org.elasticsearch.compute.aggregation.IntArrayState", "org.elasticsearch.compute.aggregation.LongArrayState",
+                "org.elasticsearch.compute.aggregation.DoubleArrayState" -> true;
+            default -> false;
+        };
+    }
 }

+ 15 - 0
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/Methods.java

@@ -117,4 +117,19 @@ public class Methods {
         }
         throw new IllegalArgumentException("unknown get method for [" + elementType + "]");
     }
+
+    /**
+     * Returns the name of the method used to get {@code valueType} instances
+     * from vectors or blocks.
+     */
+    static String vectorAccessorName(String elementTypeName) {
+        return switch (elementTypeName) {
+            case "INT" -> "getInt";
+            case "LONG" -> "getLong";
+            case "DOUBLE" -> "getDouble";
+            default -> throw new IllegalArgumentException(
+                "don't know how to fetch primitive values from " + elementTypeName + ". define combineStates."
+            );
+        };
+    }
 }

+ 18 - 42
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleArrayState.java

@@ -12,15 +12,12 @@ import org.elasticsearch.common.util.BitArray;
 import org.elasticsearch.common.util.DoubleArray;
 import org.elasticsearch.compute.ann.Experimental;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.core.Releasables;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
-
 /**
  * Aggregator state for an array of doubles.
  * This class is generated. Do not edit it.
@@ -110,9 +107,23 @@ final class DoubleArrayState implements AggregatorState<DoubleArrayState> {
         }
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset, IntVector selected) {
+        assert blocks.length >= offset + 2;
+        var valuesBuilder = DoubleBlock.newBlockBuilder(selected.getPositionCount());
+        var nullsBuilder = BooleanBlock.newBlockBuilder(selected.getPositionCount());
+        for (int i = 0; i < selected.getPositionCount(); i++) {
+            int group = selected.getInt(i);
+            valuesBuilder.appendDouble(values.get(group));
+            nullsBuilder.appendBoolean(hasValue(group));
+        }
+        blocks[offset + 0] = valuesBuilder.build();
+        blocks[offset + 1] = nullsBuilder.build();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return Long.BYTES + (largestIndex + 1L) * Double.BYTES + LongArrayState.estimateSerializeSize(nonNulls);
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -122,41 +133,6 @@ final class DoubleArrayState implements AggregatorState<DoubleArrayState> {
 
     @Override
     public AggregatorStateSerializer<DoubleArrayState> serializer() {
-        return new DoubleArrayStateSerializer();
-    }
-
-    private static class DoubleArrayStateSerializer implements AggregatorStateSerializer<DoubleArrayState> {
-        private static final VarHandle lengthHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-        private static final VarHandle valueHandle = MethodHandles.byteArrayViewVarHandle(double[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return Double.BYTES;
-        }
-
-        @Override
-        public int serialize(DoubleArrayState state, byte[] ba, int offset, org.elasticsearch.compute.data.IntVector selected) {
-            lengthHandle.set(ba, offset, selected.getPositionCount());
-            offset += Long.BYTES;
-            for (int i = 0; i < selected.getPositionCount(); i++) {
-                valueHandle.set(ba, offset, state.values.get(selected.getInt(i)));
-                offset += Double.BYTES;
-            }
-            final int valuesBytes = Long.BYTES + (Double.BYTES * selected.getPositionCount());
-            return valuesBytes + LongArrayState.serializeBitArray(state.nonNulls, ba, offset);
-        }
-
-        @Override
-        public void deserialize(DoubleArrayState state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            int positions = (int) (long) lengthHandle.get(ba, offset);
-            offset += Long.BYTES;
-            for (int i = 0; i < positions; i++) {
-                state.set((double) valueHandle.get(ba, offset), i);
-                offset += Double.BYTES;
-            }
-            state.largestIndex = positions - 1;
-            state.nonNulls = LongArrayState.deseralizeBitArray(state.bigArrays, ba, offset);
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 12 - 33
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DoubleState.java

@@ -8,12 +8,9 @@
 package org.elasticsearch.compute.aggregation;
 
 import org.elasticsearch.compute.ann.Experimental;
-import org.elasticsearch.compute.data.IntVector;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.ConstantBooleanVector;
+import org.elasticsearch.compute.data.ConstantDoubleVector;
 
 /**
  * Aggregator state for a single double.
@@ -48,9 +45,16 @@ final class DoubleState implements AggregatorState<DoubleState> {
         this.seen = seen;
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset) {
+        assert blocks.length >= offset + 2;
+        blocks[offset + 0] = new ConstantDoubleVector(value, 1).asBlock();
+        blocks[offset + 1] = new ConstantBooleanVector(seen, 1).asBlock();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return Double.BYTES + 1;
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -58,31 +62,6 @@ final class DoubleState implements AggregatorState<DoubleState> {
 
     @Override
     public AggregatorStateSerializer<DoubleState> serializer() {
-        return new DoubleStateSerializer();
-    }
-
-    private static class DoubleStateSerializer implements AggregatorStateSerializer<DoubleState> {
-        private static final VarHandle handle = MethodHandles.byteArrayViewVarHandle(double[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return Double.BYTES + 1;
-        }
-
-        @Override
-        public int serialize(DoubleState state, byte[] ba, int offset, IntVector selected) {
-            assert selected.getPositionCount() == 1;
-            assert selected.getInt(0) == 0;
-            handle.set(ba, offset, state.value);
-            ba[offset + Double.BYTES] = (byte) (state.seen ? 1 : 0);
-            return size(); // number of bytes written
-        }
-
-        @Override
-        public void deserialize(DoubleState state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            state.value = (double) handle.get(ba, offset);
-            state.seen = ba[offset + Double.BYTES] == (byte) 1;
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 17 - 42
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntArrayState.java

@@ -12,15 +12,11 @@ import org.elasticsearch.common.util.BitArray;
 import org.elasticsearch.common.util.IntArray;
 import org.elasticsearch.compute.ann.Experimental;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.core.Releasables;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
-
 /**
  * Aggregator state for an array of ints.
  * This class is generated. Do not edit it.
@@ -110,9 +106,23 @@ final class IntArrayState implements AggregatorState<IntArrayState> {
         }
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset, IntVector selected) {
+        assert blocks.length >= offset + 2;
+        var valuesBuilder = IntBlock.newBlockBuilder(selected.getPositionCount());
+        var nullsBuilder = BooleanBlock.newBlockBuilder(selected.getPositionCount());
+        for (int i = 0; i < selected.getPositionCount(); i++) {
+            int group = selected.getInt(i);
+            valuesBuilder.appendInt(values.get(group));
+            nullsBuilder.appendBoolean(hasValue(group));
+        }
+        blocks[offset + 0] = valuesBuilder.build();
+        blocks[offset + 1] = nullsBuilder.build();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return Long.BYTES + (largestIndex + 1L) * Integer.BYTES + LongArrayState.estimateSerializeSize(nonNulls);
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -122,41 +132,6 @@ final class IntArrayState implements AggregatorState<IntArrayState> {
 
     @Override
     public AggregatorStateSerializer<IntArrayState> serializer() {
-        return new IntArrayStateSerializer();
-    }
-
-    private static class IntArrayStateSerializer implements AggregatorStateSerializer<IntArrayState> {
-        private static final VarHandle lengthHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-        private static final VarHandle valueHandle = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return Integer.BYTES;
-        }
-
-        @Override
-        public int serialize(IntArrayState state, byte[] ba, int offset, org.elasticsearch.compute.data.IntVector selected) {
-            lengthHandle.set(ba, offset, selected.getPositionCount());
-            offset += Long.BYTES;
-            for (int i = 0; i < selected.getPositionCount(); i++) {
-                valueHandle.set(ba, offset, state.values.get(selected.getInt(i)));
-                offset += Integer.BYTES;
-            }
-            final int valuesBytes = Long.BYTES + (Integer.BYTES * selected.getPositionCount());
-            return valuesBytes + LongArrayState.serializeBitArray(state.nonNulls, ba, offset);
-        }
-
-        @Override
-        public void deserialize(IntArrayState state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            int positions = (int) (long) lengthHandle.get(ba, offset);
-            offset += Long.BYTES;
-            for (int i = 0; i < positions; i++) {
-                state.set((int) valueHandle.get(ba, offset), i);
-                offset += Integer.BYTES;
-            }
-            state.largestIndex = positions - 1;
-            state.nonNulls = LongArrayState.deseralizeBitArray(state.bigArrays, ba, offset);
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 12 - 33
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/IntState.java

@@ -8,12 +8,9 @@
 package org.elasticsearch.compute.aggregation;
 
 import org.elasticsearch.compute.ann.Experimental;
-import org.elasticsearch.compute.data.IntVector;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.ConstantBooleanVector;
+import org.elasticsearch.compute.data.ConstantIntVector;
 
 /**
  * Aggregator state for a single int.
@@ -48,9 +45,16 @@ final class IntState implements AggregatorState<IntState> {
         this.seen = seen;
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset) {
+        assert blocks.length >= offset + 2;
+        blocks[offset + 0] = new ConstantIntVector(value, 1).asBlock();
+        blocks[offset + 1] = new ConstantBooleanVector(seen, 1).asBlock();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return Integer.BYTES + 1;
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -58,31 +62,6 @@ final class IntState implements AggregatorState<IntState> {
 
     @Override
     public AggregatorStateSerializer<IntState> serializer() {
-        return new IntStateSerializer();
-    }
-
-    private static class IntStateSerializer implements AggregatorStateSerializer<IntState> {
-        private static final VarHandle handle = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return Integer.BYTES + 1;
-        }
-
-        @Override
-        public int serialize(IntState state, byte[] ba, int offset, IntVector selected) {
-            assert selected.getPositionCount() == 1;
-            assert selected.getInt(0) == 0;
-            handle.set(ba, offset, state.value);
-            ba[offset + Integer.BYTES] = (byte) (state.seen ? 1 : 0);
-            return size(); // number of bytes written
-        }
-
-        @Override
-        public void deserialize(IntState state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            state.value = (int) handle.get(ba, offset);
-            state.seen = ba[offset + Integer.BYTES] == (byte) 1;
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 18 - 79
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongArrayState.java

@@ -12,15 +12,12 @@ import org.elasticsearch.common.util.BitArray;
 import org.elasticsearch.common.util.LongArray;
 import org.elasticsearch.compute.ann.Experimental;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.core.Releasables;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
-
 /**
  * Aggregator state for an array of longs.
  * This class is generated. Do not edit it.
@@ -121,9 +118,23 @@ final class LongArrayState implements AggregatorState<LongArrayState> {
         }
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset, IntVector selected) {
+        assert blocks.length >= offset + 2;
+        var valuesBuilder = LongBlock.newBlockBuilder(selected.getPositionCount());
+        var nullsBuilder = BooleanBlock.newBlockBuilder(selected.getPositionCount());
+        for (int i = 0; i < selected.getPositionCount(); i++) {
+            int group = selected.getInt(i);
+            valuesBuilder.appendLong(values.get(group));
+            nullsBuilder.appendBoolean(hasValue(group));
+        }
+        blocks[offset + 0] = valuesBuilder.build();
+        blocks[offset + 1] = nullsBuilder.build();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return Long.BYTES + (largestIndex + 1L) * Long.BYTES + LongArrayState.estimateSerializeSize(nonNulls);
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -133,78 +144,6 @@ final class LongArrayState implements AggregatorState<LongArrayState> {
 
     @Override
     public AggregatorStateSerializer<LongArrayState> serializer() {
-        return new LongArrayStateSerializer();
-    }
-
-    private static final VarHandle longHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-
-    static int serializeBitArray(BitArray bits, byte[] ba, int offset) {
-        if (bits == null) {
-            longHandle.set(ba, offset, 0);
-            return Long.BYTES;
-        }
-        final LongArray array = bits.getBits();
-        longHandle.set(ba, offset, array.size());
-        offset += Long.BYTES;
-        for (long i = 0; i < array.size(); i++) {
-            longHandle.set(ba, offset, array.get(i));
-        }
-        return Long.BYTES + Math.toIntExact(array.size() * Long.BYTES);
-    }
-
-    static BitArray deseralizeBitArray(BigArrays bigArrays, byte[] ba, int offset) {
-        long size = (long) longHandle.get(ba, offset);
-        if (size == 0) {
-            return null;
-        } else {
-            offset += Long.BYTES;
-            final LongArray array = bigArrays.newLongArray(size);
-            for (long i = 0; i < size; i++) {
-                array.set(i, (long) longHandle.get(ba, offset));
-            }
-            return new BitArray(bigArrays, array);
-        }
-    }
-
-    static int estimateSerializeSize(BitArray bits) {
-        if (bits == null) {
-            return Long.BYTES;
-        }
-        return Long.BYTES + Math.toIntExact(bits.getBits().size() * Long.BYTES);
-    }
-
-    private static class LongArrayStateSerializer implements AggregatorStateSerializer<LongArrayState> {
-        private static final VarHandle lengthHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-        private static final VarHandle valueHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return Long.BYTES;
-        }
-
-        @Override
-        public int serialize(LongArrayState state, byte[] ba, int offset, org.elasticsearch.compute.data.IntVector selected) {
-            lengthHandle.set(ba, offset, selected.getPositionCount());
-            offset += Long.BYTES;
-            for (int i = 0; i < selected.getPositionCount(); i++) {
-                valueHandle.set(ba, offset, state.values.get(selected.getInt(i)));
-                offset += Long.BYTES;
-            }
-            final int valuesBytes = Long.BYTES + (Long.BYTES * selected.getPositionCount());
-            return valuesBytes + LongArrayState.serializeBitArray(state.nonNulls, ba, offset);
-        }
-
-        @Override
-        public void deserialize(LongArrayState state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            int positions = (int) (long) lengthHandle.get(ba, offset);
-            offset += Long.BYTES;
-            for (int i = 0; i < positions; i++) {
-                state.set((long) valueHandle.get(ba, offset), i);
-                offset += Long.BYTES;
-            }
-            state.largestIndex = positions - 1;
-            state.nonNulls = LongArrayState.deseralizeBitArray(state.bigArrays, ba, offset);
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 12 - 33
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/LongState.java

@@ -8,12 +8,9 @@
 package org.elasticsearch.compute.aggregation;
 
 import org.elasticsearch.compute.ann.Experimental;
-import org.elasticsearch.compute.data.IntVector;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.ConstantBooleanVector;
+import org.elasticsearch.compute.data.ConstantLongVector;
 
 /**
  * Aggregator state for a single long.
@@ -48,9 +45,16 @@ final class LongState implements AggregatorState<LongState> {
         this.seen = seen;
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset) {
+        assert blocks.length >= offset + 2;
+        blocks[offset + 0] = new ConstantLongVector(value, 1).asBlock();
+        blocks[offset + 1] = new ConstantBooleanVector(seen, 1).asBlock();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return Long.BYTES + 1;
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -58,31 +62,6 @@ final class LongState implements AggregatorState<LongState> {
 
     @Override
     public AggregatorStateSerializer<LongState> serializer() {
-        return new LongStateSerializer();
-    }
-
-    private static class LongStateSerializer implements AggregatorStateSerializer<LongState> {
-        private static final VarHandle handle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return Long.BYTES + 1;
-        }
-
-        @Override
-        public int serialize(LongState state, byte[] ba, int offset, IntVector selected) {
-            assert selected.getPositionCount() == 1;
-            assert selected.getInt(0) == 0;
-            handle.set(ba, offset, state.value);
-            ba[offset + Long.BYTES] = (byte) (state.seen ? 1 : 0);
-            return size(); // number of bytes written
-        }
-
-        @Override
-        public void deserialize(LongState state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            state.value = (long) handle.get(ba, offset);
-            state.seen = ba[offset + Long.BYTES] == (byte) 1;
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 14 - 23
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleAggregatorFunction.java

@@ -9,15 +9,13 @@ import java.lang.Override;
 import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link AggregatorFunction} implementation for {@link MaxDoubleAggregator}.
@@ -25,7 +23,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MaxDoubleAggregatorFunction implements AggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("max", ElementType.DOUBLE),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final DoubleState state;
 
@@ -87,29 +86,21 @@ public final class MaxDoubleAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addIntermediateInput(Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<DoubleState> blobVector = (AggregatorStateVector<DoubleState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    DoubleState tmpState = new DoubleState(MaxDoubleAggregator.init());
-    for (int i = 0; i < block.getPositionCount(); i++) {
-      blobVector.get(i, tmpState);
-      state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), tmpState.doubleValue()));
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    DoubleVector max = page.<DoubleBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert max.getPositionCount() == 1;
+    assert max.getPositionCount() == seen.getPositionCount();
+    if (seen.getBoolean(0)) {
+      state.doubleValue(MaxDoubleAggregator.combine(state.doubleValue(), max.getDouble(0)));
+      state.seen(true);
     }
-    state.seen(state.seen() || tmpState.seen());
-    tmpState.close();
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    AggregatorStateVector.Builder<AggregatorStateVector<DoubleState>, DoubleState> builder =
-        AggregatorStateVector.builderOfAggregatorState(DoubleState.class, state.getEstimatedSize());
-    builder.add(state, IntVector.range(0, 1));
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 12 - 20
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxDoubleGroupingAggregatorFunction.java

@@ -10,8 +10,9 @@ import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.ElementType;
@@ -19,7 +20,6 @@ import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link GroupingAggregatorFunction} implementation for {@link MaxDoubleAggregator}.
@@ -27,7 +27,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("max", ElementType.DOUBLE),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final DoubleArrayState state;
 
@@ -169,25 +170,19 @@ public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<DoubleArrayState> blobVector = (AggregatorStateVector<DoubleArrayState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    DoubleArrayState inState = new DoubleArrayState(bigArrays, MaxDoubleAggregator.init());
-    blobVector.get(0, inState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    DoubleVector max = page.<DoubleBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert max.getPositionCount() == seen.getPositionCount();
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      if (inState.hasValue(position)) {
-        state.set(MaxDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (seen.getBoolean(position)) {
+        state.set(MaxDoubleAggregator.combine(state.getOrDefault(groupId), max.getDouble(position)), groupId);
       } else {
         state.putNull(groupId);
       }
     }
-    inState.close();
   }
 
   @Override
@@ -205,10 +200,7 @@ public final class MaxDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    AggregatorStateVector.Builder<AggregatorStateVector<DoubleArrayState>, DoubleArrayState> builder =
-        AggregatorStateVector.builderOfAggregatorState(DoubleArrayState.class, state.getEstimatedSize());
-    builder.add(state, selected);
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 14 - 22
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntAggregatorFunction.java

@@ -9,14 +9,13 @@ import java.lang.Override;
 import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link AggregatorFunction} implementation for {@link MaxIntAggregator}.
@@ -24,7 +23,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MaxIntAggregatorFunction implements AggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("max", ElementType.INT),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final IntState state;
 
@@ -86,29 +86,21 @@ public final class MaxIntAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addIntermediateInput(Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<IntState> blobVector = (AggregatorStateVector<IntState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    IntState tmpState = new IntState(MaxIntAggregator.init());
-    for (int i = 0; i < block.getPositionCount(); i++) {
-      blobVector.get(i, tmpState);
-      state.intValue(MaxIntAggregator.combine(state.intValue(), tmpState.intValue()));
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    IntVector max = page.<IntBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert max.getPositionCount() == 1;
+    assert max.getPositionCount() == seen.getPositionCount();
+    if (seen.getBoolean(0)) {
+      state.intValue(MaxIntAggregator.combine(state.intValue(), max.getInt(0)));
+      state.seen(true);
     }
-    state.seen(state.seen() || tmpState.seen());
-    tmpState.close();
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    AggregatorStateVector.Builder<AggregatorStateVector<IntState>, IntState> builder =
-        AggregatorStateVector.builderOfAggregatorState(IntState.class, state.getEstimatedSize());
-    builder.add(state, IntVector.range(0, 1));
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 12 - 20
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxIntGroupingAggregatorFunction.java

@@ -10,15 +10,15 @@ import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link GroupingAggregatorFunction} implementation for {@link MaxIntAggregator}.
@@ -26,7 +26,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MaxIntGroupingAggregatorFunction implements GroupingAggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("max", ElementType.INT),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final IntArrayState state;
 
@@ -168,25 +169,19 @@ public final class MaxIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<IntArrayState> blobVector = (AggregatorStateVector<IntArrayState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    IntArrayState inState = new IntArrayState(bigArrays, MaxIntAggregator.init());
-    blobVector.get(0, inState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    IntVector max = page.<IntBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert max.getPositionCount() == seen.getPositionCount();
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      if (inState.hasValue(position)) {
-        state.set(MaxIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (seen.getBoolean(position)) {
+        state.set(MaxIntAggregator.combine(state.getOrDefault(groupId), max.getInt(position)), groupId);
       } else {
         state.putNull(groupId);
       }
     }
-    inState.close();
   }
 
   @Override
@@ -204,10 +199,7 @@ public final class MaxIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    AggregatorStateVector.Builder<AggregatorStateVector<IntArrayState>, IntArrayState> builder =
-        AggregatorStateVector.builderOfAggregatorState(IntArrayState.class, state.getEstimatedSize());
-    builder.add(state, selected);
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 14 - 23
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongAggregatorFunction.java

@@ -9,15 +9,13 @@ import java.lang.Override;
 import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link AggregatorFunction} implementation for {@link MaxLongAggregator}.
@@ -25,7 +23,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MaxLongAggregatorFunction implements AggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("max", ElementType.LONG),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final LongState state;
 
@@ -87,29 +86,21 @@ public final class MaxLongAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addIntermediateInput(Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<LongState> blobVector = (AggregatorStateVector<LongState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    LongState tmpState = new LongState(MaxLongAggregator.init());
-    for (int i = 0; i < block.getPositionCount(); i++) {
-      blobVector.get(i, tmpState);
-      state.longValue(MaxLongAggregator.combine(state.longValue(), tmpState.longValue()));
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    LongVector max = page.<LongBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert max.getPositionCount() == 1;
+    assert max.getPositionCount() == seen.getPositionCount();
+    if (seen.getBoolean(0)) {
+      state.longValue(MaxLongAggregator.combine(state.longValue(), max.getLong(0)));
+      state.seen(true);
     }
-    state.seen(state.seen() || tmpState.seen());
-    tmpState.close();
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    AggregatorStateVector.Builder<AggregatorStateVector<LongState>, LongState> builder =
-        AggregatorStateVector.builderOfAggregatorState(LongState.class, state.getEstimatedSize());
-    builder.add(state, IntVector.range(0, 1));
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 12 - 20
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MaxLongGroupingAggregatorFunction.java

@@ -10,14 +10,14 @@ import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link GroupingAggregatorFunction} implementation for {@link MaxLongAggregator}.
@@ -25,7 +25,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MaxLongGroupingAggregatorFunction implements GroupingAggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("max", ElementType.LONG),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final LongArrayState state;
 
@@ -167,25 +168,19 @@ public final class MaxLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<LongArrayState> blobVector = (AggregatorStateVector<LongArrayState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    LongArrayState inState = new LongArrayState(bigArrays, MaxLongAggregator.init());
-    blobVector.get(0, inState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    LongVector max = page.<LongBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert max.getPositionCount() == seen.getPositionCount();
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      if (inState.hasValue(position)) {
-        state.set(MaxLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (seen.getBoolean(position)) {
+        state.set(MaxLongAggregator.combine(state.getOrDefault(groupId), max.getLong(position)), groupId);
       } else {
         state.putNull(groupId);
       }
     }
-    inState.close();
   }
 
   @Override
@@ -203,10 +198,7 @@ public final class MaxLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    AggregatorStateVector.Builder<AggregatorStateVector<LongArrayState>, LongArrayState> builder =
-        AggregatorStateVector.builderOfAggregatorState(LongArrayState.class, state.getEstimatedSize());
-    builder.add(state, selected);
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 14 - 23
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleAggregatorFunction.java

@@ -9,15 +9,13 @@ import java.lang.Override;
 import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link AggregatorFunction} implementation for {@link MinDoubleAggregator}.
@@ -25,7 +23,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MinDoubleAggregatorFunction implements AggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("min", ElementType.DOUBLE),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final DoubleState state;
 
@@ -87,29 +86,21 @@ public final class MinDoubleAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addIntermediateInput(Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<DoubleState> blobVector = (AggregatorStateVector<DoubleState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    DoubleState tmpState = new DoubleState(MinDoubleAggregator.init());
-    for (int i = 0; i < block.getPositionCount(); i++) {
-      blobVector.get(i, tmpState);
-      state.doubleValue(MinDoubleAggregator.combine(state.doubleValue(), tmpState.doubleValue()));
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    DoubleVector min = page.<DoubleBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert min.getPositionCount() == 1;
+    assert min.getPositionCount() == seen.getPositionCount();
+    if (seen.getBoolean(0)) {
+      state.doubleValue(MinDoubleAggregator.combine(state.doubleValue(), min.getDouble(0)));
+      state.seen(true);
     }
-    state.seen(state.seen() || tmpState.seen());
-    tmpState.close();
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    AggregatorStateVector.Builder<AggregatorStateVector<DoubleState>, DoubleState> builder =
-        AggregatorStateVector.builderOfAggregatorState(DoubleState.class, state.getEstimatedSize());
-    builder.add(state, IntVector.range(0, 1));
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 12 - 20
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinDoubleGroupingAggregatorFunction.java

@@ -10,8 +10,9 @@ import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.ElementType;
@@ -19,7 +20,6 @@ import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link GroupingAggregatorFunction} implementation for {@link MinDoubleAggregator}.
@@ -27,7 +27,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MinDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("min", ElementType.DOUBLE),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final DoubleArrayState state;
 
@@ -169,25 +170,19 @@ public final class MinDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<DoubleArrayState> blobVector = (AggregatorStateVector<DoubleArrayState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    DoubleArrayState inState = new DoubleArrayState(bigArrays, MinDoubleAggregator.init());
-    blobVector.get(0, inState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    DoubleVector min = page.<DoubleBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert min.getPositionCount() == seen.getPositionCount();
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      if (inState.hasValue(position)) {
-        state.set(MinDoubleAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (seen.getBoolean(position)) {
+        state.set(MinDoubleAggregator.combine(state.getOrDefault(groupId), min.getDouble(position)), groupId);
       } else {
         state.putNull(groupId);
       }
     }
-    inState.close();
   }
 
   @Override
@@ -205,10 +200,7 @@ public final class MinDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    AggregatorStateVector.Builder<AggregatorStateVector<DoubleArrayState>, DoubleArrayState> builder =
-        AggregatorStateVector.builderOfAggregatorState(DoubleArrayState.class, state.getEstimatedSize());
-    builder.add(state, selected);
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 14 - 22
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntAggregatorFunction.java

@@ -9,14 +9,13 @@ import java.lang.Override;
 import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link AggregatorFunction} implementation for {@link MinIntAggregator}.
@@ -24,7 +23,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MinIntAggregatorFunction implements AggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("min", ElementType.INT),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final IntState state;
 
@@ -86,29 +86,21 @@ public final class MinIntAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addIntermediateInput(Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<IntState> blobVector = (AggregatorStateVector<IntState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    IntState tmpState = new IntState(MinIntAggregator.init());
-    for (int i = 0; i < block.getPositionCount(); i++) {
-      blobVector.get(i, tmpState);
-      state.intValue(MinIntAggregator.combine(state.intValue(), tmpState.intValue()));
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    IntVector min = page.<IntBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert min.getPositionCount() == 1;
+    assert min.getPositionCount() == seen.getPositionCount();
+    if (seen.getBoolean(0)) {
+      state.intValue(MinIntAggregator.combine(state.intValue(), min.getInt(0)));
+      state.seen(true);
     }
-    state.seen(state.seen() || tmpState.seen());
-    tmpState.close();
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    AggregatorStateVector.Builder<AggregatorStateVector<IntState>, IntState> builder =
-        AggregatorStateVector.builderOfAggregatorState(IntState.class, state.getEstimatedSize());
-    builder.add(state, IntVector.range(0, 1));
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 12 - 20
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinIntGroupingAggregatorFunction.java

@@ -10,15 +10,15 @@ import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link GroupingAggregatorFunction} implementation for {@link MinIntAggregator}.
@@ -26,7 +26,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class MinIntGroupingAggregatorFunction implements GroupingAggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("min", ElementType.INT),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final IntArrayState state;
 
@@ -168,25 +169,19 @@ public final class MinIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<IntArrayState> blobVector = (AggregatorStateVector<IntArrayState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    IntArrayState inState = new IntArrayState(bigArrays, MinIntAggregator.init());
-    blobVector.get(0, inState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    IntVector min = page.<IntBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert min.getPositionCount() == seen.getPositionCount();
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      if (inState.hasValue(position)) {
-        state.set(MinIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (seen.getBoolean(position)) {
+        state.set(MinIntAggregator.combine(state.getOrDefault(groupId), min.getInt(position)), groupId);
       } else {
         state.putNull(groupId);
       }
     }
-    inState.close();
   }
 
   @Override
@@ -204,10 +199,7 @@ public final class MinIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    AggregatorStateVector.Builder<AggregatorStateVector<IntArrayState>, IntArrayState> builder =
-        AggregatorStateVector.builderOfAggregatorState(IntArrayState.class, state.getEstimatedSize());
-    builder.add(state, selected);
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 5 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongAggregatorFunction.java

@@ -92,12 +92,15 @@ public final class MinLongAggregatorFunction implements AggregatorFunction {
     BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
     assert min.getPositionCount() == 1;
     assert min.getPositionCount() == seen.getPositionCount();
-    MinLongAggregator.combineIntermediate(state, min, seen);
+    if (seen.getBoolean(0)) {
+      state.longValue(MinLongAggregator.combine(state.longValue(), min.getLong(0)));
+      state.seen(true);
+    }
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    MinLongAggregator.evaluateIntermediate(state, blocks, offset);
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 9 - 2
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/MinLongGroupingAggregatorFunction.java

@@ -173,7 +173,14 @@ public final class MinLongGroupingAggregatorFunction implements GroupingAggregat
     LongVector min = page.<LongBlock>getBlock(channels.get(0)).asVector();
     BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
     assert min.getPositionCount() == seen.getPositionCount();
-    MinLongAggregator.combineIntermediate(groupIdVector, state, min, seen);
+    for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
+      int groupId = Math.toIntExact(groupIdVector.getLong(position));
+      if (seen.getBoolean(position)) {
+        state.set(MinLongAggregator.combine(state.getOrDefault(groupId), min.getLong(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
+    }
   }
 
   @Override
@@ -191,7 +198,7 @@ public final class MinLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    MinLongAggregator.evaluateIntermediate(state, blocks, offset, selected);
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunction.java

@@ -99,7 +99,7 @@ public final class SumDoubleAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    SumDoubleAggregator.evaluateIntermediate(state, blocks, offset);
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumDoubleGroupingAggregatorFunction.java

@@ -195,7 +195,7 @@ public final class SumDoubleGroupingAggregatorFunction implements GroupingAggreg
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    SumDoubleAggregator.evaluateIntermediate(state, blocks, offset, selected);
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 15 - 22
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntAggregatorFunction.java

@@ -9,15 +9,15 @@ import java.lang.Override;
 import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link AggregatorFunction} implementation for {@link SumIntAggregator}.
@@ -25,7 +25,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class SumIntAggregatorFunction implements AggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("sum", ElementType.LONG),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final LongState state;
 
@@ -87,29 +88,21 @@ public final class SumIntAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addIntermediateInput(Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<LongState> blobVector = (AggregatorStateVector<LongState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    LongState tmpState = new LongState(SumIntAggregator.init());
-    for (int i = 0; i < block.getPositionCount(); i++) {
-      blobVector.get(i, tmpState);
-      SumIntAggregator.combineStates(state, tmpState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    LongVector sum = page.<LongBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert sum.getPositionCount() == 1;
+    assert sum.getPositionCount() == seen.getPositionCount();
+    if (seen.getBoolean(0)) {
+      state.longValue(SumIntAggregator.combine(state.longValue(), sum.getLong(0)));
+      state.seen(true);
     }
-    state.seen(state.seen() || tmpState.seen());
-    tmpState.close();
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    AggregatorStateVector.Builder<AggregatorStateVector<LongState>, LongState> builder =
-        AggregatorStateVector.builderOfAggregatorState(LongState.class, state.getEstimatedSize());
-    builder.add(state, IntVector.range(0, 1));
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 20 - 20
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumIntGroupingAggregatorFunction.java

@@ -10,15 +10,15 @@ import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link GroupingAggregatorFunction} implementation for {@link SumIntAggregator}.
@@ -26,7 +26,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class SumIntGroupingAggregatorFunction implements GroupingAggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("sum", ElementType.LONG),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final LongArrayState state;
 
@@ -168,21 +169,19 @@ public final class SumIntGroupingAggregatorFunction implements GroupingAggregato
 
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<LongArrayState> blobVector = (AggregatorStateVector<LongArrayState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    LongArrayState inState = new LongArrayState(bigArrays, SumIntAggregator.init());
-    blobVector.get(0, inState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    LongVector sum = page.<LongBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert sum.getPositionCount() == seen.getPositionCount();
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      SumIntAggregator.combineStates(state, groupId, inState, position);
+      if (seen.getBoolean(position)) {
+        state.set(SumIntAggregator.combine(state.getOrDefault(groupId), sum.getLong(position)), groupId);
+      } else {
+        state.putNull(groupId);
+      }
     }
-    inState.close();
   }
 
   @Override
@@ -191,15 +190,16 @@ public final class SumIntGroupingAggregatorFunction implements GroupingAggregato
       throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
     }
     LongArrayState inState = ((SumIntGroupingAggregatorFunction) input).state;
-    SumIntAggregator.combineStates(state, groupId, inState, position);
+    if (inState.hasValue(position)) {
+      state.set(SumIntAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+    } else {
+      state.putNull(groupId);
+    }
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    AggregatorStateVector.Builder<AggregatorStateVector<LongArrayState>, LongArrayState> builder =
-        AggregatorStateVector.builderOfAggregatorState(LongArrayState.class, state.getEstimatedSize());
-    builder.add(state, selected);
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 14 - 23
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongAggregatorFunction.java

@@ -9,15 +9,13 @@ import java.lang.Override;
 import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
-import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link AggregatorFunction} implementation for {@link SumLongAggregator}.
@@ -25,7 +23,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class SumLongAggregatorFunction implements AggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("sum", ElementType.LONG),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final LongState state;
 
@@ -87,29 +86,21 @@ public final class SumLongAggregatorFunction implements AggregatorFunction {
 
   @Override
   public void addIntermediateInput(Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<LongState> blobVector = (AggregatorStateVector<LongState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    LongState tmpState = new LongState(SumLongAggregator.init());
-    for (int i = 0; i < block.getPositionCount(); i++) {
-      blobVector.get(i, tmpState);
-      state.longValue(SumLongAggregator.combine(state.longValue(), tmpState.longValue()));
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    LongVector sum = page.<LongBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert sum.getPositionCount() == 1;
+    assert sum.getPositionCount() == seen.getPositionCount();
+    if (seen.getBoolean(0)) {
+      state.longValue(SumLongAggregator.combine(state.longValue(), sum.getLong(0)));
+      state.seen(true);
     }
-    state.seen(state.seen() || tmpState.seen());
-    tmpState.close();
   }
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset) {
-    AggregatorStateVector.Builder<AggregatorStateVector<LongState>, LongState> builder =
-        AggregatorStateVector.builderOfAggregatorState(LongState.class, state.getEstimatedSize());
-    builder.add(state, IntVector.range(0, 1));
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset);
   }
 
   @Override

+ 12 - 20
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/SumLongGroupingAggregatorFunction.java

@@ -10,14 +10,14 @@ import java.lang.String;
 import java.lang.StringBuilder;
 import java.util.List;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.Vector;
 
 /**
  * {@link GroupingAggregatorFunction} implementation for {@link SumLongAggregator}.
@@ -25,7 +25,8 @@ import org.elasticsearch.compute.data.Vector;
  */
 public final class SumLongGroupingAggregatorFunction implements GroupingAggregatorFunction {
   private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
-      new IntermediateStateDesc("aggstate", ElementType.UNKNOWN)  );
+      new IntermediateStateDesc("sum", ElementType.LONG),
+      new IntermediateStateDesc("seen", ElementType.BOOLEAN)  );
 
   private final LongArrayState state;
 
@@ -167,25 +168,19 @@ public final class SumLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void addIntermediateInput(LongVector groupIdVector, Page page) {
-    Block block = page.getBlock(channels.get(0));
-    Vector vector = block.asVector();
-    if (vector == null || vector instanceof AggregatorStateVector == false) {
-      throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-    }
-    @SuppressWarnings("unchecked") AggregatorStateVector<LongArrayState> blobVector = (AggregatorStateVector<LongArrayState>) vector;
-    // TODO exchange big arrays directly without funny serialization - no more copying
-    BigArrays bigArrays = BigArrays.NON_RECYCLING_INSTANCE;
-    LongArrayState inState = new LongArrayState(bigArrays, SumLongAggregator.init());
-    blobVector.get(0, inState);
+    assert channels.size() == intermediateBlockCount();
+    assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+    LongVector sum = page.<LongBlock>getBlock(channels.get(0)).asVector();
+    BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+    assert sum.getPositionCount() == seen.getPositionCount();
     for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
       int groupId = Math.toIntExact(groupIdVector.getLong(position));
-      if (inState.hasValue(position)) {
-        state.set(SumLongAggregator.combine(state.getOrDefault(groupId), inState.get(position)), groupId);
+      if (seen.getBoolean(position)) {
+        state.set(SumLongAggregator.combine(state.getOrDefault(groupId), sum.getLong(position)), groupId);
       } else {
         state.putNull(groupId);
       }
     }
-    inState.close();
   }
 
   @Override
@@ -203,10 +198,7 @@ public final class SumLongGroupingAggregatorFunction implements GroupingAggregat
 
   @Override
   public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-    AggregatorStateVector.Builder<AggregatorStateVector<LongArrayState>, LongArrayState> builder =
-        AggregatorStateVector.builderOfAggregatorState(LongArrayState.class, state.getEstimatedSize());
-    builder.add(state, selected);
-    blocks[offset] = builder.build().asBlock();
+    state.toIntermediate(blocks, offset, selected);
   }
 
   @Override

+ 18 - 22
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountAggregatorFunction.java

@@ -9,10 +9,12 @@ package org.elasticsearch.compute.aggregation;
 
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.ann.Experimental;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
+import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 
 import java.util.List;
@@ -38,8 +40,13 @@ public class CountAggregatorFunction implements AggregatorFunction {
         };
     }
 
+    private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
+        new IntermediateStateDesc("count", ElementType.LONG),
+        new IntermediateStateDesc("seen", ElementType.BOOLEAN)
+    );
+
     public static List<IntermediateStateDesc> intermediateStateDesc() {
-        return IntermediateStateDesc.AGG_STATE;
+        return INTERMEDIATE_STATE_DESC;
     }
 
     private final LongState state;
@@ -68,29 +75,18 @@ public class CountAggregatorFunction implements AggregatorFunction {
 
     @Override
     public void addIntermediateInput(Page page) {
-        Block block = page.getBlock(channels.get(0));
-        if (block.asVector() != null && block.asVector() instanceof AggregatorStateVector) {
-            @SuppressWarnings("unchecked")
-            AggregatorStateVector<LongState> blobVector = (AggregatorStateVector) block.asVector();
-            LongState state = this.state;
-            LongState tmpState = new LongState();
-            for (int i = 0; i < block.getPositionCount(); i++) {
-                blobVector.get(i, tmpState);
-                state.longValue(state.longValue() + tmpState.longValue());
-            }
-        } else {
-            throw new RuntimeException("expected AggregatorStateBlock, got:" + block);
-        }
+        assert channels.size() == intermediateBlockCount();
+        assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+        LongVector count = page.<LongBlock>getBlock(channels.get(0)).asVector();
+        BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+        assert count.getPositionCount() == 1;
+        assert count.getPositionCount() == seen.getPositionCount();
+        state.longValue(state.longValue() + count.getLong(0));
     }
 
     @Override
     public void evaluateIntermediate(Block[] blocks, int offset) {
-        AggregatorStateVector.Builder<AggregatorStateVector<LongState>, LongState> builder = AggregatorStateVector.builderOfAggregatorState(
-            LongState.class,
-            state.getEstimatedSize()
-        );
-        builder.add(state, IntVector.range(0, 1));
-        blocks[offset] = builder.build().asBlock();
+        state.toIntermediate(blocks, offset);
     }
 
     @Override

+ 17 - 21
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java

@@ -9,8 +9,10 @@ package org.elasticsearch.compute.aggregation;
 
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.ann.Experimental;
-import org.elasticsearch.compute.data.AggregatorStateVector;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BooleanVector;
+import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
@@ -22,6 +24,11 @@ import java.util.List;
 @Experimental
 public class CountGroupingAggregatorFunction implements GroupingAggregatorFunction {
 
+    private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
+        new IntermediateStateDesc("count", ElementType.LONG),
+        new IntermediateStateDesc("seen", ElementType.BOOLEAN)
+    );
+
     private final LongArrayState state;
     private final List<Integer> channels;
 
@@ -30,7 +37,7 @@ public class CountGroupingAggregatorFunction implements GroupingAggregatorFuncti
     }
 
     public static List<IntermediateStateDesc> intermediateStateDesc() {
-        return IntermediateStateDesc.AGG_STATE;
+        return INTERMEDIATE_STATE_DESC;
     }
 
     private CountGroupingAggregatorFunction(List<Integer> channels, LongArrayState state) {
@@ -120,21 +127,13 @@ public class CountGroupingAggregatorFunction implements GroupingAggregatorFuncti
 
     @Override
     public void addIntermediateInput(LongVector groupIdVector, Page page) {
-        Block block = page.getBlock(channels.get(0));
-        Vector vector = block.asVector();
-        if (vector instanceof AggregatorStateVector) {
-            @SuppressWarnings("unchecked")
-            AggregatorStateVector<LongArrayState> blobBlock = (AggregatorStateVector<LongArrayState>) vector;
-            // TODO exchange big arrays directly without funny serialization - no more copying
-            LongArrayState tmpState = new LongArrayState(BigArrays.NON_RECYCLING_INSTANCE, 0);
-            blobBlock.get(0, tmpState);
-            final int positions = groupIdVector.getPositionCount();
-            final LongArrayState state = this.state;
-            for (int i = 0; i < positions; i++) {
-                state.increment(tmpState.get(i), Math.toIntExact(groupIdVector.getLong(i)));
-            }
-        } else {
-            throw new RuntimeException("expected AggregatorStateVector, got:" + block);
+        assert channels.size() == intermediateBlockCount();
+        assert page.getBlockCount() >= channels.get(0) + intermediateStateDesc().size();
+        LongVector count = page.<LongBlock>getBlock(channels.get(0)).asVector();
+        BooleanVector seen = page.<BooleanBlock>getBlock(channels.get(1)).asVector();
+        assert count.getPositionCount() == seen.getPositionCount();
+        for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
+            state.increment(count.getLong(position), Math.toIntExact(groupIdVector.getLong(position)));
         }
     }
 
@@ -149,10 +148,7 @@ public class CountGroupingAggregatorFunction implements GroupingAggregatorFuncti
 
     @Override
     public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
-        AggregatorStateVector.Builder<AggregatorStateVector<LongArrayState>, LongArrayState> builder = AggregatorStateVector
-            .builderOfAggregatorState(LongArrayState.class, state.getEstimatedSize());
-        builder.add(state, selected);
-        blocks[offset] = builder.build().asBlock();
+        state.toIntermediate(blocks, offset, selected);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxDoubleAggregator.java

@@ -11,7 +11,7 @@ import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
 
-@Aggregator({ @IntermediateState(name = "aggstate", type = "UNKNOWN") })
+@Aggregator({ @IntermediateState(name = "max", type = "DOUBLE"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
 class MaxDoubleAggregator {
 

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxIntAggregator.java

@@ -11,7 +11,7 @@ import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
 
-@Aggregator({ @IntermediateState(name = "aggstate", type = "UNKNOWN") })
+@Aggregator({ @IntermediateState(name = "max", type = "INT"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
 class MaxIntAggregator {
 

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MaxLongAggregator.java

@@ -11,7 +11,7 @@ import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
 
-@Aggregator({ @IntermediateState(name = "aggstate", type = "UNKNOWN") })
+@Aggregator({ @IntermediateState(name = "max", type = "LONG"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
 class MaxLongAggregator {
 

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MinDoubleAggregator.java

@@ -11,7 +11,7 @@ import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
 
-@Aggregator({ @IntermediateState(name = "aggstate", type = "UNKNOWN") })
+@Aggregator({ @IntermediateState(name = "min", type = "DOUBLE"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
 class MinDoubleAggregator {
 

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MinIntAggregator.java

@@ -11,7 +11,7 @@ import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
 
-@Aggregator({ @IntermediateState(name = "aggstate", type = "UNKNOWN") })
+@Aggregator({ @IntermediateState(name = "min", type = "INT"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
 class MinIntAggregator {
 

+ 0 - 44
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/MinLongAggregator.java

@@ -10,14 +10,6 @@ package org.elasticsearch.compute.aggregation;
 import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
-import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.BooleanBlock;
-import org.elasticsearch.compute.data.BooleanVector;
-import org.elasticsearch.compute.data.ConstantBooleanVector;
-import org.elasticsearch.compute.data.ConstantLongVector;
-import org.elasticsearch.compute.data.IntVector;
-import org.elasticsearch.compute.data.LongBlock;
-import org.elasticsearch.compute.data.LongVector;
 
 @Aggregator({ @IntermediateState(name = "min", type = "LONG"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
@@ -31,40 +23,4 @@ class MinLongAggregator {
         return Math.min(current, v);
     }
 
-    public static void combineIntermediate(LongState state, LongVector values, BooleanVector seen) {
-        if (seen.getBoolean(0)) {
-            state.longValue(combine(state.longValue(), values.getLong(0)));
-            state.seen(true);
-        }
-    }
-
-    public static void evaluateIntermediate(LongState state, Block[] blocks, int offset) {
-        assert blocks.length >= offset + 2;
-        blocks[offset + 0] = new ConstantLongVector(state.longValue(), 1).asBlock();
-        blocks[offset + 1] = new ConstantBooleanVector(state.seen(), 1).asBlock();
-    }
-
-    public static void combineIntermediate(LongVector groupIdVector, LongArrayState state, LongVector values, BooleanVector seen) {
-        for (int position = 0; position < groupIdVector.getPositionCount(); position++) {
-            int groupId = Math.toIntExact(groupIdVector.getLong(position));
-            if (seen.getBoolean(position)) {
-                state.set(MinLongAggregator.combine(state.getOrDefault(groupId), values.getLong(position)), groupId);
-            } else {
-                state.putNull(groupId);
-            }
-        }
-    }
-
-    public static void evaluateIntermediate(LongArrayState state, Block[] blocks, int offset, IntVector selected) {
-        assert blocks.length >= offset + 2;
-        var valuesBuilder = LongBlock.newBlockBuilder(selected.getPositionCount());
-        var nullsBuilder = BooleanBlock.newBlockBuilder(selected.getPositionCount());
-        for (int i = 0; i < selected.getPositionCount(); i++) {
-            int group = selected.getInt(i);
-            valuesBuilder.appendLong(state.get(group));
-            nullsBuilder.appendBoolean(state.hasValue(group));
-        }
-        blocks[offset + 0] = valuesBuilder.build();
-        blocks[offset + 1] = nullsBuilder.build();
-    }
 }

+ 8 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumDoubleAggregator.java

@@ -156,6 +156,10 @@ class SumDoubleAggregator {
             super(value, delta);
         }
 
+        void toIntermediate(Block[] blocks, int offset) {
+            SumDoubleAggregator.evaluateIntermediate(this, blocks, offset);
+        }
+
         @Override
         public long getEstimatedSize() {
             throw new UnsupportedOperationException();
@@ -260,6 +264,10 @@ class SumDoubleAggregator {
             }
         }
 
+        void toIntermediate(Block[] blocks, int offset, IntVector selected) {
+            SumDoubleAggregator.evaluateIntermediate(this, blocks, offset, selected);
+        }
+
         @Override
         public long getEstimatedSize() {
             throw new UnsupportedOperationException();

+ 3 - 11
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumIntAggregator.java

@@ -11,7 +11,7 @@ import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
 
-@Aggregator({ @IntermediateState(name = "aggstate", type = "UNKNOWN") })
+@Aggregator({ @IntermediateState(name = "sum", type = "LONG"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
 class SumIntAggregator {
 
@@ -23,15 +23,7 @@ class SumIntAggregator {
         return Math.addExact(current, v);
     }
 
-    public static void combineStates(LongState current, LongState state) {
-        current.longValue(Math.addExact(current.longValue(), state.longValue()));
-    }
-
-    public static void combineStates(LongArrayState current, int groupId, LongArrayState state, int position) {
-        if (state.hasValue(position)) {
-            current.set(Math.addExact(current.getOrDefault(groupId), state.get(position)), groupId);
-        } else {
-            current.putNull(groupId);
-        }
+    public static long combine(long current, long v) {
+        return Math.addExact(current, v);
     }
 }

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/SumLongAggregator.java

@@ -11,7 +11,7 @@ import org.elasticsearch.compute.ann.Aggregator;
 import org.elasticsearch.compute.ann.GroupingAggregator;
 import org.elasticsearch.compute.ann.IntermediateState;
 
-@Aggregator({ @IntermediateState(name = "aggstate", type = "UNKNOWN") })
+@Aggregator({ @IntermediateState(name = "sum", type = "LONG"), @IntermediateState(name = "seen", type = "BOOLEAN") })
 @GroupingAggregator
 class SumLongAggregator {
 

+ 23 - 81
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ArrayState.java.st

@@ -12,15 +12,17 @@ import org.elasticsearch.common.util.BitArray;
 import org.elasticsearch.common.util.$Type$Array;
 import org.elasticsearch.compute.ann.Experimental;
 import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+$if(long)$
+import org.elasticsearch.compute.data.IntVector;
+$endif$
 import org.elasticsearch.compute.data.$Type$Block;
 import org.elasticsearch.compute.data.$Type$Vector;
+$if(double)$
+import org.elasticsearch.compute.data.IntVector;
+$endif$
 import org.elasticsearch.core.Releasables;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
-
 /**
  * Aggregator state for an array of $type$s.
  * This class is generated. Do not edit it.
@@ -123,9 +125,23 @@ $endif$
         }
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset, IntVector selected) {
+        assert blocks.length >= offset + 2;
+        var valuesBuilder = $Type$Block.newBlockBuilder(selected.getPositionCount());
+        var nullsBuilder = BooleanBlock.newBlockBuilder(selected.getPositionCount());
+        for (int i = 0; i < selected.getPositionCount(); i++) {
+            int group = selected.getInt(i);
+            valuesBuilder.append$Type$(values.get(group));
+            nullsBuilder.appendBoolean(hasValue(group));
+        }
+        blocks[offset + 0] = valuesBuilder.build();
+        blocks[offset + 1] = nullsBuilder.build();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return Long.BYTES + (largestIndex + 1L) * $BYTES$ + LongArrayState.estimateSerializeSize(nonNulls);
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -135,80 +151,6 @@ $endif$
 
     @Override
     public AggregatorStateSerializer<$Type$ArrayState> serializer() {
-        return new $Type$ArrayStateSerializer();
-    }
-
-$if(long)$
-    private static final VarHandle longHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-
-    static int serializeBitArray(BitArray bits, byte[] ba, int offset) {
-        if (bits == null) {
-            longHandle.set(ba, offset, 0);
-            return Long.BYTES;
-        }
-        final LongArray array = bits.getBits();
-        longHandle.set(ba, offset, array.size());
-        offset += Long.BYTES;
-        for (long i = 0; i < array.size(); i++) {
-            longHandle.set(ba, offset, array.get(i));
-        }
-        return Long.BYTES + Math.toIntExact(array.size() * Long.BYTES);
-    }
-
-    static BitArray deseralizeBitArray(BigArrays bigArrays, byte[] ba, int offset) {
-        long size = (long) longHandle.get(ba, offset);
-        if (size == 0) {
-            return null;
-        } else {
-            offset += Long.BYTES;
-            final LongArray array = bigArrays.newLongArray(size);
-            for (long i = 0; i < size; i++) {
-                array.set(i, (long) longHandle.get(ba, offset));
-            }
-            return new BitArray(bigArrays, array);
-        }
-    }
-
-    static int estimateSerializeSize(BitArray bits) {
-        if (bits == null) {
-            return Long.BYTES;
-        }
-        return Long.BYTES + Math.toIntExact(bits.getBits().size() * Long.BYTES);
-    }
-$endif$
-
-    private static class $Type$ArrayStateSerializer implements AggregatorStateSerializer<$Type$ArrayState> {
-        private static final VarHandle lengthHandle = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
-        private static final VarHandle valueHandle = MethodHandles.byteArrayViewVarHandle($type$[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return $BYTES$;
-        }
-
-        @Override
-        public int serialize($Type$ArrayState state, byte[] ba, int offset, org.elasticsearch.compute.data.IntVector selected) {
-            lengthHandle.set(ba, offset, selected.getPositionCount());
-            offset += Long.BYTES;
-            for (int i = 0; i < selected.getPositionCount(); i++) {
-                valueHandle.set(ba, offset, state.values.get(selected.getInt(i)));
-                offset += $BYTES$;
-            }
-            final int valuesBytes = Long.BYTES + ($BYTES$ * selected.getPositionCount());
-            return valuesBytes + LongArrayState.serializeBitArray(state.nonNulls, ba, offset);
-        }
-
-        @Override
-        public void deserialize($Type$ArrayState state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            int positions = (int) (long) lengthHandle.get(ba, offset);
-            offset += Long.BYTES;
-            for (int i = 0; i < positions; i++) {
-                state.set(($type$) valueHandle.get(ba, offset), i);
-                offset += $BYTES$;
-            }
-            state.largestIndex = positions - 1;
-            state.nonNulls = LongArrayState.deseralizeBitArray(state.bigArrays, ba, offset);
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 12 - 33
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-State.java.st

@@ -8,12 +8,9 @@
 package org.elasticsearch.compute.aggregation;
 
 import org.elasticsearch.compute.ann.Experimental;
-import org.elasticsearch.compute.data.IntVector;
-
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.VarHandle;
-import java.nio.ByteOrder;
-import java.util.Objects;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.ConstantBooleanVector;
+import org.elasticsearch.compute.data.Constant$Type$Vector;
 
 /**
  * Aggregator state for a single $type$.
@@ -48,9 +45,16 @@ final class $Type$State implements AggregatorState<$Type$State> {
         this.seen = seen;
     }
 
+    /** Extracts an intermediate view of the contents of this state.  */
+    void toIntermediate(Block[] blocks, int offset) {
+        assert blocks.length >= offset + 2;
+        blocks[offset + 0] = new Constant$Type$Vector(value, 1).asBlock();
+        blocks[offset + 1] = new ConstantBooleanVector(seen, 1).asBlock();
+    }
+
     @Override
     public long getEstimatedSize() {
-        return $BYTES$ + 1;
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -58,31 +62,6 @@ final class $Type$State implements AggregatorState<$Type$State> {
 
     @Override
     public AggregatorStateSerializer<$Type$State> serializer() {
-        return new $Type$StateSerializer();
-    }
-
-    private static class $Type$StateSerializer implements AggregatorStateSerializer<$Type$State> {
-        private static final VarHandle handle = MethodHandles.byteArrayViewVarHandle($type$[].class, ByteOrder.BIG_ENDIAN);
-
-        @Override
-        public int size() {
-            return $BYTES$ + 1;
-        }
-
-        @Override
-        public int serialize($Type$State state, byte[] ba, int offset, IntVector selected) {
-            assert selected.getPositionCount() == 1;
-            assert selected.getInt(0) == 0;
-            handle.set(ba, offset, state.value);
-            ba[offset + $BYTES$] = (byte) (state.seen ? 1 : 0);
-            return size(); // number of bytes written
-        }
-
-        @Override
-        public void deserialize($Type$State state, byte[] ba, int offset) {
-            Objects.requireNonNull(state);
-            state.value = ($type$) handle.get(ba, offset);
-            state.seen = ba[offset + $BYTES$] == (byte) 1;
-        }
+        throw new UnsupportedOperationException();
     }
 }

+ 2 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -318,7 +318,7 @@ public class OperatorTests extends ESTestCase {
         }
     }
 
-    public void testGroupingWithOrdinals() throws IOException {
+    public void testGroupingWithOrdinals() throws Exception {
         final String gField = "g";
         final int numDocs = between(100, 10000);
         final Map<BytesRef, Long> expectedCounts = new HashMap<>();
@@ -415,7 +415,7 @@ public class OperatorTests extends ESTestCase {
                             driverContext
                         ),
                         new HashAggregationOperator(
-                            List.of(CountAggregatorFunction.supplier(bigArrays, List.of(1)).groupingAggregatorFactory(FINAL)),
+                            List.of(CountAggregatorFunction.supplier(bigArrays, List.of(1, 2)).groupingAggregatorFactory(FINAL)),
                             () -> BlockHash.build(List.of(new HashAggregationOperator.GroupSpec(0, ElementType.BYTES_REF)), bigArrays),
                             driverContext
                         )