瀏覽代碼

Implementation of Delta function for absolute change in gauges over time (#135035)

* Implementation of Delta function for absolute change in gauges over time window

* csv tests

* fixup

* add extrapolation code for Delta

* fixup

* fixup

* fxup

* foxup

* fix csv tests
P 2 周之前
父節點
當前提交
ff70916fa6
共有 20 個文件被更改,包括 3416 次插入6 次删除
  1. 22 0
      x-pack/plugin/esql/compute/build.gradle
  2. 250 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaDoubleAggregator.java
  3. 250 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaFloatAggregator.java
  4. 250 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaIntAggregator.java
  5. 250 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaLongAggregator.java
  6. 46 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaDoubleAggregatorFunctionSupplier.java
  7. 406 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaDoubleGroupingAggregatorFunction.java
  8. 46 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaFloatAggregatorFunctionSupplier.java
  9. 406 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaFloatGroupingAggregatorFunction.java
  10. 46 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaIntAggregatorFunctionSupplier.java
  11. 405 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaIntGroupingAggregatorFunction.java
  12. 46 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaLongAggregatorFunctionSupplier.java
  13. 404 0
      x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaLongGroupingAggregatorFunction.java
  14. 250 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-DeltaAggregator.java.st
  15. 171 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-delta.csv-spec
  16. 21 5
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java
  17. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  18. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
  19. 1 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java
  20. 142 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Delta.java

+ 22 - 0
x-pack/plugin/esql/compute/build.gradle

@@ -513,6 +513,28 @@ tasks.named('stringTemplates').configure {
     it.outputFile = "org/elasticsearch/compute/aggregation/IrateDoubleAggregator.java"
   }
 
+  File deltaAggregatorInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-DeltaAggregator.java.st")
+  template {
+    it.properties = intProperties
+    it.inputFile = deltaAggregatorInputFile
+    it.outputFile = "org/elasticsearch/compute/aggregation/DeltaIntAggregator.java"
+  }
+  template {
+    it.properties = longProperties
+    it.inputFile = deltaAggregatorInputFile
+    it.outputFile = "org/elasticsearch/compute/aggregation/DeltaLongAggregator.java"
+  }
+  template {
+    it.properties = floatProperties
+    it.inputFile = deltaAggregatorInputFile
+    it.outputFile = "org/elasticsearch/compute/aggregation/DeltaFloatAggregator.java"
+  }
+  template {
+    it.properties = doubleProperties
+    it.inputFile = deltaAggregatorInputFile
+    it.outputFile = "org/elasticsearch/compute/aggregation/DeltaDoubleAggregator.java"
+  }
+
   File fallibleStateInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-FallibleState.java.st")
   template {
     it.properties = booleanProperties

+ 250 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaDoubleAggregator.java

@@ -0,0 +1,250 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+// begin generated imports
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArray;
+import org.elasticsearch.compute.ann.GroupingAggregator;
+import org.elasticsearch.compute.ann.IntermediateState;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.FloatBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+// end generated imports
+
+/**
+ * A rate grouping aggregation definition for double. This implementation supports the `Delta` and `idelta` functions.
+ * This class is generated. Edit `X-DeltaAggregator.java.st` instead.
+ */
+@GroupingAggregator(
+    value = {
+        @IntermediateState(name = "samples", type = "LONG_BLOCK"),
+        @IntermediateState(name = "timestamps", type = "LONG_BLOCK"),
+        @IntermediateState(name = "values", type = "DOUBLE_BLOCK") }
+)
+public class DeltaDoubleAggregator {
+    public static DoubleDeltaGroupingState initGrouping(DriverContext driverContext) {
+        return new DoubleDeltaGroupingState(driverContext.bigArrays(), driverContext.breaker());
+    }
+
+    public static void combine(DoubleDeltaGroupingState current, int groupId, double value, long timestamp) {
+        current.ensureCapacity(groupId);
+        current.append(groupId, timestamp, value);
+    }
+
+    public static void combineIntermediate(
+        DoubleDeltaGroupingState current,
+        int groupId,
+        LongBlock samples,
+        LongBlock timestamps,
+        DoubleBlock values,
+        int otherPosition
+    ) {
+        current.combine(groupId, samples, timestamps, values, otherPosition);
+    }
+
+    public static Block evaluateFinal(DoubleDeltaGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+        return state.evaluateFinal(selected, evalContext);
+    }
+
+    private static class DoubleDeltaState {
+        static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(DoubleDeltaState.class);
+        long lastTimestamp = -1;
+        long firstTimestamp = Long.MAX_VALUE;
+        double lastValue;
+        double firstValue;
+        long valuesSeen;
+
+        DoubleDeltaState(long seenTs, double seenValue) {
+            this.lastTimestamp = seenTs;
+            this.lastValue = seenValue;
+            this.firstTimestamp = seenTs;
+            this.firstValue = seenValue;
+            this.valuesSeen = 1L;
+        }
+
+        long bytesUsed() {
+            return BASE_RAM_USAGE;
+        }
+    }
+
+    public static final class DoubleDeltaGroupingState implements Releasable, Accountable, GroupingAggregatorState {
+        private ObjectArray<DoubleDeltaState> states;
+        private final BigArrays bigArrays;
+        private final CircuitBreaker breaker;
+        private long stateBytes; // for individual states
+
+        DoubleDeltaGroupingState(BigArrays bigArrays, CircuitBreaker breaker) {
+            this.bigArrays = bigArrays;
+            this.breaker = breaker;
+            this.states = bigArrays.newObjectArray(1);
+        }
+
+        void ensureCapacity(int groupId) {
+            states = bigArrays.grow(states, groupId + 1);
+        }
+
+        void adjustBreaker(long bytes) {
+            breaker.addEstimateBytesAndMaybeBreak(bytes, "<<delta aggregation>>");
+            stateBytes += bytes;
+            assert stateBytes >= 0 : stateBytes;
+        }
+
+        void append(int groupId, long timestamp, double value) {
+            var state = states.get(groupId);
+            if (state == null) {
+                state = new DoubleDeltaState(timestamp, value);
+                states.set(groupId, state);
+                adjustBreaker(state.bytesUsed());
+            } else {
+                if (timestamp >= state.lastTimestamp) {
+                    state.lastTimestamp = timestamp;
+                    state.lastValue = value;
+                    state.valuesSeen++;
+                } else if (timestamp <= state.firstTimestamp) {
+                    state.firstTimestamp = timestamp;
+                    state.firstValue = value;
+                    state.valuesSeen++;
+                } // else: ignore, too old
+            }
+        }
+
+        void combine(int groupId, LongBlock samples, LongBlock timestamps, DoubleBlock values, int otherPosition) {
+            final int valueCount = timestamps.getValueCount(otherPosition);
+            if (valueCount == 0) {
+                return;
+            }
+            final long valuesSeen = samples.getLong(samples.getFirstValueIndex(otherPosition));
+            final int firstTs = timestamps.getFirstValueIndex(otherPosition);
+            final int firstIndex = values.getFirstValueIndex(otherPosition);
+            ensureCapacity(groupId);
+            append(groupId, timestamps.getLong(firstTs), values.getDouble(firstIndex));
+            if (valueCount > 1) {
+                ensureCapacity(groupId);
+                append(groupId, timestamps.getLong(firstTs + 1), values.getDouble(firstIndex + 1));
+            }
+            // We are merging the state from upstream, which means we have seen
+            // `valuesSeen` values, but we have already counted one or two of them,
+            // which is represented by `valueCount - 1`.
+            states.get(groupId).valuesSeen += valuesSeen - valueCount;
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return states.ramBytesUsed() + stateBytes;
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(states, () -> adjustBreaker(-stateBytes));
+        }
+
+        @Override
+        public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
+            assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset;
+            final BlockFactory blockFactory = driverContext.blockFactory();
+            final int positionCount = selected.getPositionCount();
+            try (
+                LongBlock.Builder samples = blockFactory.newLongBlockBuilder(positionCount);
+                LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2);
+                DoubleBlock.Builder values = blockFactory.newDoubleBlockBuilder(positionCount * 2);
+            ) {
+                for (int i = 0; i < positionCount; i++) {
+                    final var groupId = selected.getInt(i);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state != null) {
+                        samples.beginPositionEntry();
+                        samples.appendLong(state.valuesSeen);
+                        samples.endPositionEntry();
+                        timestamps.beginPositionEntry();
+                        timestamps.appendLong(state.lastTimestamp);
+                        if (state.valuesSeen > 1) {
+                            timestamps.appendLong(state.firstTimestamp);
+                        }
+                        timestamps.endPositionEntry();
+
+                        values.beginPositionEntry();
+                        values.appendDouble(state.lastValue);
+                        if (state.valuesSeen > 1) {
+                            values.appendDouble(state.firstValue);
+                        }
+                        values.endPositionEntry();
+                    } else {
+                        samples.appendLong(0L);
+                        timestamps.appendNull();
+                        values.appendNull();
+                    }
+                }
+                blocks[offset] = samples.build();
+                blocks[offset + 1] = timestamps.build();
+                blocks[offset + 2] = values.build();
+            }
+        }
+
+        Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+            int positionCount = selected.getPositionCount();
+            try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
+                for (int p = 0; p < positionCount; p++) {
+                    final var groupId = selected.getInt(p);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state == null || state.valuesSeen < 2) {
+                        rates.appendNull();
+                        continue;
+                    }
+                    if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) {
+                        // At this point we want to apply extrapolation
+                        var rangeStart = tsContext.rangeStartInMillis(groupId);
+                        var rangeEnd = tsContext.rangeEndInMillis(groupId);
+                        if (state.lastTimestamp - state.firstTimestamp == 0) {
+                            rates.appendNull();
+                            continue;
+                        }
+                        double startGap = state.firstTimestamp - rangeStart;
+                        final double averageSampleInterval = (state.lastTimestamp - state.firstTimestamp) / state.valuesSeen;
+                        final double slope = (state.lastValue - state.firstValue) / (state.lastTimestamp - state.firstTimestamp);
+                        double endGap = rangeEnd - state.lastTimestamp;
+                        double calculatedFirstValue = state.firstValue;
+                        if (startGap > 0) {
+                            if (startGap > averageSampleInterval * 1.1) {
+                                startGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedFirstValue = calculatedFirstValue - startGap * slope;
+                        }
+                        double calculatedLastValue = state.lastValue;
+                        if (endGap > 0) {
+                            if (endGap > averageSampleInterval * 1.1) {
+                                endGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedLastValue = calculatedLastValue + endGap * slope;
+                        }
+                        rates.appendDouble(calculatedLastValue - calculatedFirstValue);
+                    } else {
+                        rates.appendDouble(state.lastValue - state.firstValue);
+                    }
+                }
+                return rates.build();
+            }
+        }
+
+        @Override
+        public void enableGroupIdTracking(SeenGroupIds seenGroupIds) {
+            // noop - we handle the null states inside `toIntermediate` and `evaluateFinal`
+        }
+    }
+}

+ 250 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaFloatAggregator.java

@@ -0,0 +1,250 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+// begin generated imports
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArray;
+import org.elasticsearch.compute.ann.GroupingAggregator;
+import org.elasticsearch.compute.ann.IntermediateState;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.FloatBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+// end generated imports
+
+/**
+ * A rate grouping aggregation definition for float. This implementation supports the `Delta` and `idelta` functions.
+ * This class is generated. Edit `X-DeltaAggregator.java.st` instead.
+ */
+@GroupingAggregator(
+    value = {
+        @IntermediateState(name = "samples", type = "LONG_BLOCK"),
+        @IntermediateState(name = "timestamps", type = "LONG_BLOCK"),
+        @IntermediateState(name = "values", type = "FLOAT_BLOCK") }
+)
+public class DeltaFloatAggregator {
+    public static FloatDeltaGroupingState initGrouping(DriverContext driverContext) {
+        return new FloatDeltaGroupingState(driverContext.bigArrays(), driverContext.breaker());
+    }
+
+    public static void combine(FloatDeltaGroupingState current, int groupId, float value, long timestamp) {
+        current.ensureCapacity(groupId);
+        current.append(groupId, timestamp, value);
+    }
+
+    public static void combineIntermediate(
+        FloatDeltaGroupingState current,
+        int groupId,
+        LongBlock samples,
+        LongBlock timestamps,
+        FloatBlock values,
+        int otherPosition
+    ) {
+        current.combine(groupId, samples, timestamps, values, otherPosition);
+    }
+
+    public static Block evaluateFinal(FloatDeltaGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+        return state.evaluateFinal(selected, evalContext);
+    }
+
+    private static class FloatDeltaState {
+        static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(FloatDeltaState.class);
+        long lastTimestamp = -1;
+        long firstTimestamp = Long.MAX_VALUE;
+        float lastValue;
+        float firstValue;
+        long valuesSeen;
+
+        FloatDeltaState(long seenTs, float seenValue) {
+            this.lastTimestamp = seenTs;
+            this.lastValue = seenValue;
+            this.firstTimestamp = seenTs;
+            this.firstValue = seenValue;
+            this.valuesSeen = 1L;
+        }
+
+        long bytesUsed() {
+            return BASE_RAM_USAGE;
+        }
+    }
+
+    public static final class FloatDeltaGroupingState implements Releasable, Accountable, GroupingAggregatorState {
+        private ObjectArray<FloatDeltaState> states;
+        private final BigArrays bigArrays;
+        private final CircuitBreaker breaker;
+        private long stateBytes; // for individual states
+
+        FloatDeltaGroupingState(BigArrays bigArrays, CircuitBreaker breaker) {
+            this.bigArrays = bigArrays;
+            this.breaker = breaker;
+            this.states = bigArrays.newObjectArray(1);
+        }
+
+        void ensureCapacity(int groupId) {
+            states = bigArrays.grow(states, groupId + 1);
+        }
+
+        void adjustBreaker(long bytes) {
+            breaker.addEstimateBytesAndMaybeBreak(bytes, "<<delta aggregation>>");
+            stateBytes += bytes;
+            assert stateBytes >= 0 : stateBytes;
+        }
+
+        void append(int groupId, long timestamp, float value) {
+            var state = states.get(groupId);
+            if (state == null) {
+                state = new FloatDeltaState(timestamp, value);
+                states.set(groupId, state);
+                adjustBreaker(state.bytesUsed());
+            } else {
+                if (timestamp >= state.lastTimestamp) {
+                    state.lastTimestamp = timestamp;
+                    state.lastValue = value;
+                    state.valuesSeen++;
+                } else if (timestamp <= state.firstTimestamp) {
+                    state.firstTimestamp = timestamp;
+                    state.firstValue = value;
+                    state.valuesSeen++;
+                } // else: ignore, too old
+            }
+        }
+
+        void combine(int groupId, LongBlock samples, LongBlock timestamps, FloatBlock values, int otherPosition) {
+            final int valueCount = timestamps.getValueCount(otherPosition);
+            if (valueCount == 0) {
+                return;
+            }
+            final long valuesSeen = samples.getLong(samples.getFirstValueIndex(otherPosition));
+            final int firstTs = timestamps.getFirstValueIndex(otherPosition);
+            final int firstIndex = values.getFirstValueIndex(otherPosition);
+            ensureCapacity(groupId);
+            append(groupId, timestamps.getLong(firstTs), values.getFloat(firstIndex));
+            if (valueCount > 1) {
+                ensureCapacity(groupId);
+                append(groupId, timestamps.getLong(firstTs + 1), values.getFloat(firstIndex + 1));
+            }
+            // We are merging the state from upstream, which means we have seen
+            // `valuesSeen` values, but we have already counted one or two of them,
+            // which is represented by `valueCount - 1`.
+            states.get(groupId).valuesSeen += valuesSeen - valueCount;
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return states.ramBytesUsed() + stateBytes;
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(states, () -> adjustBreaker(-stateBytes));
+        }
+
+        @Override
+        public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
+            assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset;
+            final BlockFactory blockFactory = driverContext.blockFactory();
+            final int positionCount = selected.getPositionCount();
+            try (
+                LongBlock.Builder samples = blockFactory.newLongBlockBuilder(positionCount);
+                LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2);
+                FloatBlock.Builder values = blockFactory.newFloatBlockBuilder(positionCount * 2);
+            ) {
+                for (int i = 0; i < positionCount; i++) {
+                    final var groupId = selected.getInt(i);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state != null) {
+                        samples.beginPositionEntry();
+                        samples.appendLong(state.valuesSeen);
+                        samples.endPositionEntry();
+                        timestamps.beginPositionEntry();
+                        timestamps.appendLong(state.lastTimestamp);
+                        if (state.valuesSeen > 1) {
+                            timestamps.appendLong(state.firstTimestamp);
+                        }
+                        timestamps.endPositionEntry();
+
+                        values.beginPositionEntry();
+                        values.appendFloat(state.lastValue);
+                        if (state.valuesSeen > 1) {
+                            values.appendFloat(state.firstValue);
+                        }
+                        values.endPositionEntry();
+                    } else {
+                        samples.appendLong(0L);
+                        timestamps.appendNull();
+                        values.appendNull();
+                    }
+                }
+                blocks[offset] = samples.build();
+                blocks[offset + 1] = timestamps.build();
+                blocks[offset + 2] = values.build();
+            }
+        }
+
+        Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+            int positionCount = selected.getPositionCount();
+            try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
+                for (int p = 0; p < positionCount; p++) {
+                    final var groupId = selected.getInt(p);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state == null || state.valuesSeen < 2) {
+                        rates.appendNull();
+                        continue;
+                    }
+                    if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) {
+                        // At this point we want to apply extrapolation
+                        var rangeStart = tsContext.rangeStartInMillis(groupId);
+                        var rangeEnd = tsContext.rangeEndInMillis(groupId);
+                        if (state.lastTimestamp - state.firstTimestamp == 0) {
+                            rates.appendNull();
+                            continue;
+                        }
+                        double startGap = state.firstTimestamp - rangeStart;
+                        final double averageSampleInterval = (state.lastTimestamp - state.firstTimestamp) / state.valuesSeen;
+                        final double slope = (state.lastValue - state.firstValue) / (state.lastTimestamp - state.firstTimestamp);
+                        double endGap = rangeEnd - state.lastTimestamp;
+                        double calculatedFirstValue = state.firstValue;
+                        if (startGap > 0) {
+                            if (startGap > averageSampleInterval * 1.1) {
+                                startGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedFirstValue = calculatedFirstValue - startGap * slope;
+                        }
+                        double calculatedLastValue = state.lastValue;
+                        if (endGap > 0) {
+                            if (endGap > averageSampleInterval * 1.1) {
+                                endGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedLastValue = calculatedLastValue + endGap * slope;
+                        }
+                        rates.appendDouble(calculatedLastValue - calculatedFirstValue);
+                    } else {
+                        rates.appendDouble(state.lastValue - state.firstValue);
+                    }
+                }
+                return rates.build();
+            }
+        }
+
+        @Override
+        public void enableGroupIdTracking(SeenGroupIds seenGroupIds) {
+            // noop - we handle the null states inside `toIntermediate` and `evaluateFinal`
+        }
+    }
+}

+ 250 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaIntAggregator.java

@@ -0,0 +1,250 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+// begin generated imports
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArray;
+import org.elasticsearch.compute.ann.GroupingAggregator;
+import org.elasticsearch.compute.ann.IntermediateState;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.FloatBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+// end generated imports
+
+/**
+ * A rate grouping aggregation definition for int. This implementation supports the `Delta` and `idelta` functions.
+ * This class is generated. Edit `X-DeltaAggregator.java.st` instead.
+ */
+@GroupingAggregator(
+    value = {
+        @IntermediateState(name = "samples", type = "LONG_BLOCK"),
+        @IntermediateState(name = "timestamps", type = "LONG_BLOCK"),
+        @IntermediateState(name = "values", type = "INT_BLOCK") }
+)
+public class DeltaIntAggregator {
+    public static IntDeltaGroupingState initGrouping(DriverContext driverContext) {
+        return new IntDeltaGroupingState(driverContext.bigArrays(), driverContext.breaker());
+    }
+
+    public static void combine(IntDeltaGroupingState current, int groupId, int value, long timestamp) {
+        current.ensureCapacity(groupId);
+        current.append(groupId, timestamp, value);
+    }
+
+    public static void combineIntermediate(
+        IntDeltaGroupingState current,
+        int groupId,
+        LongBlock samples,
+        LongBlock timestamps,
+        IntBlock values,
+        int otherPosition
+    ) {
+        current.combine(groupId, samples, timestamps, values, otherPosition);
+    }
+
+    public static Block evaluateFinal(IntDeltaGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+        return state.evaluateFinal(selected, evalContext);
+    }
+
+    private static class IntDeltaState {
+        static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(IntDeltaState.class);
+        long lastTimestamp = -1;
+        long firstTimestamp = Long.MAX_VALUE;
+        int lastValue;
+        int firstValue;
+        long valuesSeen;
+
+        IntDeltaState(long seenTs, int seenValue) {
+            this.lastTimestamp = seenTs;
+            this.lastValue = seenValue;
+            this.firstTimestamp = seenTs;
+            this.firstValue = seenValue;
+            this.valuesSeen = 1L;
+        }
+
+        long bytesUsed() {
+            return BASE_RAM_USAGE;
+        }
+    }
+
+    public static final class IntDeltaGroupingState implements Releasable, Accountable, GroupingAggregatorState {
+        private ObjectArray<IntDeltaState> states;
+        private final BigArrays bigArrays;
+        private final CircuitBreaker breaker;
+        private long stateBytes; // for individual states
+
+        IntDeltaGroupingState(BigArrays bigArrays, CircuitBreaker breaker) {
+            this.bigArrays = bigArrays;
+            this.breaker = breaker;
+            this.states = bigArrays.newObjectArray(1);
+        }
+
+        void ensureCapacity(int groupId) {
+            states = bigArrays.grow(states, groupId + 1);
+        }
+
+        void adjustBreaker(long bytes) {
+            breaker.addEstimateBytesAndMaybeBreak(bytes, "<<delta aggregation>>");
+            stateBytes += bytes;
+            assert stateBytes >= 0 : stateBytes;
+        }
+
+        void append(int groupId, long timestamp, int value) {
+            var state = states.get(groupId);
+            if (state == null) {
+                state = new IntDeltaState(timestamp, value);
+                states.set(groupId, state);
+                adjustBreaker(state.bytesUsed());
+            } else {
+                if (timestamp >= state.lastTimestamp) {
+                    state.lastTimestamp = timestamp;
+                    state.lastValue = value;
+                    state.valuesSeen++;
+                } else if (timestamp <= state.firstTimestamp) {
+                    state.firstTimestamp = timestamp;
+                    state.firstValue = value;
+                    state.valuesSeen++;
+                } // else: ignore, too old
+            }
+        }
+
+        void combine(int groupId, LongBlock samples, LongBlock timestamps, IntBlock values, int otherPosition) {
+            final int valueCount = timestamps.getValueCount(otherPosition);
+            if (valueCount == 0) {
+                return;
+            }
+            final long valuesSeen = samples.getLong(samples.getFirstValueIndex(otherPosition));
+            final int firstTs = timestamps.getFirstValueIndex(otherPosition);
+            final int firstIndex = values.getFirstValueIndex(otherPosition);
+            ensureCapacity(groupId);
+            append(groupId, timestamps.getLong(firstTs), values.getInt(firstIndex));
+            if (valueCount > 1) {
+                ensureCapacity(groupId);
+                append(groupId, timestamps.getLong(firstTs + 1), values.getInt(firstIndex + 1));
+            }
+            // We are merging the state from upstream, which means we have seen
+            // `valuesSeen` values, but we have already counted one or two of them,
+            // which is represented by `valueCount - 1`.
+            states.get(groupId).valuesSeen += valuesSeen - valueCount;
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return states.ramBytesUsed() + stateBytes;
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(states, () -> adjustBreaker(-stateBytes));
+        }
+
+        @Override
+        public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
+            assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset;
+            final BlockFactory blockFactory = driverContext.blockFactory();
+            final int positionCount = selected.getPositionCount();
+            try (
+                LongBlock.Builder samples = blockFactory.newLongBlockBuilder(positionCount);
+                LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2);
+                IntBlock.Builder values = blockFactory.newIntBlockBuilder(positionCount * 2);
+            ) {
+                for (int i = 0; i < positionCount; i++) {
+                    final var groupId = selected.getInt(i);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state != null) {
+                        samples.beginPositionEntry();
+                        samples.appendLong(state.valuesSeen);
+                        samples.endPositionEntry();
+                        timestamps.beginPositionEntry();
+                        timestamps.appendLong(state.lastTimestamp);
+                        if (state.valuesSeen > 1) {
+                            timestamps.appendLong(state.firstTimestamp);
+                        }
+                        timestamps.endPositionEntry();
+
+                        values.beginPositionEntry();
+                        values.appendInt(state.lastValue);
+                        if (state.valuesSeen > 1) {
+                            values.appendInt(state.firstValue);
+                        }
+                        values.endPositionEntry();
+                    } else {
+                        samples.appendLong(0L);
+                        timestamps.appendNull();
+                        values.appendNull();
+                    }
+                }
+                blocks[offset] = samples.build();
+                blocks[offset + 1] = timestamps.build();
+                blocks[offset + 2] = values.build();
+            }
+        }
+
+        Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+            int positionCount = selected.getPositionCount();
+            try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
+                for (int p = 0; p < positionCount; p++) {
+                    final var groupId = selected.getInt(p);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state == null || state.valuesSeen < 2) {
+                        rates.appendNull();
+                        continue;
+                    }
+                    if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) {
+                        // At this point we want to apply extrapolation
+                        var rangeStart = tsContext.rangeStartInMillis(groupId);
+                        var rangeEnd = tsContext.rangeEndInMillis(groupId);
+                        if (state.lastTimestamp - state.firstTimestamp == 0) {
+                            rates.appendNull();
+                            continue;
+                        }
+                        double startGap = state.firstTimestamp - rangeStart;
+                        final double averageSampleInterval = (state.lastTimestamp - state.firstTimestamp) / state.valuesSeen;
+                        final double slope = (state.lastValue - state.firstValue) / (state.lastTimestamp - state.firstTimestamp);
+                        double endGap = rangeEnd - state.lastTimestamp;
+                        double calculatedFirstValue = state.firstValue;
+                        if (startGap > 0) {
+                            if (startGap > averageSampleInterval * 1.1) {
+                                startGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedFirstValue = calculatedFirstValue - startGap * slope;
+                        }
+                        double calculatedLastValue = state.lastValue;
+                        if (endGap > 0) {
+                            if (endGap > averageSampleInterval * 1.1) {
+                                endGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedLastValue = calculatedLastValue + endGap * slope;
+                        }
+                        rates.appendDouble(calculatedLastValue - calculatedFirstValue);
+                    } else {
+                        rates.appendDouble(state.lastValue - state.firstValue);
+                    }
+                }
+                return rates.build();
+            }
+        }
+
+        @Override
+        public void enableGroupIdTracking(SeenGroupIds seenGroupIds) {
+            // noop - we handle the null states inside `toIntermediate` and `evaluateFinal`
+        }
+    }
+}

+ 250 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/DeltaLongAggregator.java

@@ -0,0 +1,250 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+// begin generated imports
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArray;
+import org.elasticsearch.compute.ann.GroupingAggregator;
+import org.elasticsearch.compute.ann.IntermediateState;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.FloatBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+// end generated imports
+
+/**
+ * A rate grouping aggregation definition for long. This implementation supports the `Delta` and `idelta` functions.
+ * This class is generated. Edit `X-DeltaAggregator.java.st` instead.
+ */
+@GroupingAggregator(
+    value = {
+        @IntermediateState(name = "samples", type = "LONG_BLOCK"),
+        @IntermediateState(name = "timestamps", type = "LONG_BLOCK"),
+        @IntermediateState(name = "values", type = "LONG_BLOCK") }
+)
+public class DeltaLongAggregator {
+    public static LongDeltaGroupingState initGrouping(DriverContext driverContext) {
+        return new LongDeltaGroupingState(driverContext.bigArrays(), driverContext.breaker());
+    }
+
+    public static void combine(LongDeltaGroupingState current, int groupId, long value, long timestamp) {
+        current.ensureCapacity(groupId);
+        current.append(groupId, timestamp, value);
+    }
+
+    public static void combineIntermediate(
+        LongDeltaGroupingState current,
+        int groupId,
+        LongBlock samples,
+        LongBlock timestamps,
+        LongBlock values,
+        int otherPosition
+    ) {
+        current.combine(groupId, samples, timestamps, values, otherPosition);
+    }
+
+    public static Block evaluateFinal(LongDeltaGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+        return state.evaluateFinal(selected, evalContext);
+    }
+
+    private static class LongDeltaState {
+        static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject(LongDeltaState.class);
+        long lastTimestamp = -1;
+        long firstTimestamp = Long.MAX_VALUE;
+        long lastValue;
+        long firstValue;
+        long valuesSeen;
+
+        LongDeltaState(long seenTs, long seenValue) {
+            this.lastTimestamp = seenTs;
+            this.lastValue = seenValue;
+            this.firstTimestamp = seenTs;
+            this.firstValue = seenValue;
+            this.valuesSeen = 1L;
+        }
+
+        long bytesUsed() {
+            return BASE_RAM_USAGE;
+        }
+    }
+
+    public static final class LongDeltaGroupingState implements Releasable, Accountable, GroupingAggregatorState {
+        private ObjectArray<LongDeltaState> states;
+        private final BigArrays bigArrays;
+        private final CircuitBreaker breaker;
+        private long stateBytes; // for individual states
+
+        LongDeltaGroupingState(BigArrays bigArrays, CircuitBreaker breaker) {
+            this.bigArrays = bigArrays;
+            this.breaker = breaker;
+            this.states = bigArrays.newObjectArray(1);
+        }
+
+        void ensureCapacity(int groupId) {
+            states = bigArrays.grow(states, groupId + 1);
+        }
+
+        void adjustBreaker(long bytes) {
+            breaker.addEstimateBytesAndMaybeBreak(bytes, "<<delta aggregation>>");
+            stateBytes += bytes;
+            assert stateBytes >= 0 : stateBytes;
+        }
+
+        void append(int groupId, long timestamp, long value) {
+            var state = states.get(groupId);
+            if (state == null) {
+                state = new LongDeltaState(timestamp, value);
+                states.set(groupId, state);
+                adjustBreaker(state.bytesUsed());
+            } else {
+                if (timestamp >= state.lastTimestamp) {
+                    state.lastTimestamp = timestamp;
+                    state.lastValue = value;
+                    state.valuesSeen++;
+                } else if (timestamp <= state.firstTimestamp) {
+                    state.firstTimestamp = timestamp;
+                    state.firstValue = value;
+                    state.valuesSeen++;
+                } // else: ignore, too old
+            }
+        }
+
+        void combine(int groupId, LongBlock samples, LongBlock timestamps, LongBlock values, int otherPosition) {
+            final int valueCount = timestamps.getValueCount(otherPosition);
+            if (valueCount == 0) {
+                return;
+            }
+            final long valuesSeen = samples.getLong(samples.getFirstValueIndex(otherPosition));
+            final int firstTs = timestamps.getFirstValueIndex(otherPosition);
+            final int firstIndex = values.getFirstValueIndex(otherPosition);
+            ensureCapacity(groupId);
+            append(groupId, timestamps.getLong(firstTs), values.getLong(firstIndex));
+            if (valueCount > 1) {
+                ensureCapacity(groupId);
+                append(groupId, timestamps.getLong(firstTs + 1), values.getLong(firstIndex + 1));
+            }
+            // We are merging the state from upstream, which means we have seen
+            // `valuesSeen` values, but we have already counted one or two of them,
+            // which is represented by `valueCount - 1`.
+            states.get(groupId).valuesSeen += valuesSeen - valueCount;
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return states.ramBytesUsed() + stateBytes;
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(states, () -> adjustBreaker(-stateBytes));
+        }
+
+        @Override
+        public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
+            assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset;
+            final BlockFactory blockFactory = driverContext.blockFactory();
+            final int positionCount = selected.getPositionCount();
+            try (
+                LongBlock.Builder samples = blockFactory.newLongBlockBuilder(positionCount);
+                LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2);
+                LongBlock.Builder values = blockFactory.newLongBlockBuilder(positionCount * 2);
+            ) {
+                for (int i = 0; i < positionCount; i++) {
+                    final var groupId = selected.getInt(i);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state != null) {
+                        samples.beginPositionEntry();
+                        samples.appendLong(state.valuesSeen);
+                        samples.endPositionEntry();
+                        timestamps.beginPositionEntry();
+                        timestamps.appendLong(state.lastTimestamp);
+                        if (state.valuesSeen > 1) {
+                            timestamps.appendLong(state.firstTimestamp);
+                        }
+                        timestamps.endPositionEntry();
+
+                        values.beginPositionEntry();
+                        values.appendLong(state.lastValue);
+                        if (state.valuesSeen > 1) {
+                            values.appendLong(state.firstValue);
+                        }
+                        values.endPositionEntry();
+                    } else {
+                        samples.appendLong(0L);
+                        timestamps.appendNull();
+                        values.appendNull();
+                    }
+                }
+                blocks[offset] = samples.build();
+                blocks[offset + 1] = timestamps.build();
+                blocks[offset + 2] = values.build();
+            }
+        }
+
+        Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+            int positionCount = selected.getPositionCount();
+            try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
+                for (int p = 0; p < positionCount; p++) {
+                    final var groupId = selected.getInt(p);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state == null || state.valuesSeen < 2) {
+                        rates.appendNull();
+                        continue;
+                    }
+                    if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) {
+                        // At this point we want to apply extrapolation
+                        var rangeStart = tsContext.rangeStartInMillis(groupId);
+                        var rangeEnd = tsContext.rangeEndInMillis(groupId);
+                        if (state.lastTimestamp - state.firstTimestamp == 0) {
+                            rates.appendNull();
+                            continue;
+                        }
+                        double startGap = state.firstTimestamp - rangeStart;
+                        final double averageSampleInterval = (state.lastTimestamp - state.firstTimestamp) / state.valuesSeen;
+                        final double slope = (state.lastValue - state.firstValue) / (state.lastTimestamp - state.firstTimestamp);
+                        double endGap = rangeEnd - state.lastTimestamp;
+                        double calculatedFirstValue = state.firstValue;
+                        if (startGap > 0) {
+                            if (startGap > averageSampleInterval * 1.1) {
+                                startGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedFirstValue = calculatedFirstValue - startGap * slope;
+                        }
+                        double calculatedLastValue = state.lastValue;
+                        if (endGap > 0) {
+                            if (endGap > averageSampleInterval * 1.1) {
+                                endGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedLastValue = calculatedLastValue + endGap * slope;
+                        }
+                        rates.appendDouble(calculatedLastValue - calculatedFirstValue);
+                    } else {
+                        rates.appendDouble(state.lastValue - state.firstValue);
+                    }
+                }
+                return rates.build();
+            }
+        }
+
+        @Override
+        public void enableGroupIdTracking(SeenGroupIds seenGroupIds) {
+            // noop - we handle the null states inside `toIntermediate` and `evaluateFinal`
+        }
+    }
+}

+ 46 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaDoubleAggregatorFunctionSupplier.java

@@ -0,0 +1,46 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.util.List;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link AggregatorFunctionSupplier} implementation for {@link DeltaDoubleAggregator}.
+ * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
+ */
+public final class DeltaDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
+  public DeltaDoubleAggregatorFunctionSupplier() {
+  }
+
+  @Override
+  public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
+    return DeltaDoubleGroupingAggregatorFunction.intermediateStateDesc();
+  }
+
+  @Override
+  public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public DeltaDoubleGroupingAggregatorFunction groupingAggregator(DriverContext driverContext,
+      List<Integer> channels) {
+    return DeltaDoubleGroupingAggregatorFunction.create(channels, driverContext);
+  }
+
+  @Override
+  public String describe() {
+    return "delta of doubles";
+  }
+}

+ 406 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaDoubleGroupingAggregatorFunction.java

@@ -0,0 +1,406 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.lang.StringBuilder;
+import java.util.List;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntArrayBlock;
+import org.elasticsearch.compute.data.IntBigArrayBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link GroupingAggregatorFunction} implementation for {@link DeltaDoubleAggregator}.
+ * This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
+ */
+public final class DeltaDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction {
+  private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
+      new IntermediateStateDesc("samples", ElementType.LONG),
+      new IntermediateStateDesc("timestamps", ElementType.LONG),
+      new IntermediateStateDesc("values", ElementType.DOUBLE)  );
+
+  private final DeltaDoubleAggregator.DoubleDeltaGroupingState state;
+
+  private final List<Integer> channels;
+
+  private final DriverContext driverContext;
+
+  public DeltaDoubleGroupingAggregatorFunction(List<Integer> channels,
+      DeltaDoubleAggregator.DoubleDeltaGroupingState state, DriverContext driverContext) {
+    this.channels = channels;
+    this.state = state;
+    this.driverContext = driverContext;
+  }
+
+  public static DeltaDoubleGroupingAggregatorFunction create(List<Integer> channels,
+      DriverContext driverContext) {
+    return new DeltaDoubleGroupingAggregatorFunction(channels, DeltaDoubleAggregator.initGrouping(driverContext), driverContext);
+  }
+
+  public static List<IntermediateStateDesc> intermediateStateDesc() {
+    return INTERMEDIATE_STATE_DESC;
+  }
+
+  @Override
+  public int intermediateBlockCount() {
+    return INTERMEDIATE_STATE_DESC.size();
+  }
+
+  @Override
+  public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds,
+      Page page) {
+    DoubleBlock valueBlock = page.getBlock(channels.get(0));
+    LongBlock timestampBlock = page.getBlock(channels.get(1));
+    DoubleVector valueVector = valueBlock.asVector();
+    if (valueVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    LongVector timestampVector = timestampBlock.asVector();
+    if (timestampVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    return new GroupingAggregatorFunction.AddInput() {
+      @Override
+      public void add(int positionOffset, IntArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntBigArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntVector groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void close() {
+      }
+    };
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, DoubleBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          double valueValue = valueBlock.getDouble(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaDoubleAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, DoubleVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        double valueValue = valueVector.getDouble(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaDoubleAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    DoubleBlock values = (DoubleBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaDoubleAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, DoubleBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          double valueValue = valueBlock.getDouble(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaDoubleAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, DoubleVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        double valueValue = valueVector.getDouble(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaDoubleAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    DoubleBlock values = (DoubleBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaDoubleAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, DoubleBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupId = groups.getInt(groupPosition);
+      int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+      int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+      for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+        double valueValue = valueBlock.getDouble(valueOffset);
+        int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+        int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+        for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+          long timestampValue = timestampBlock.getLong(timestampOffset);
+          DeltaDoubleAggregator.combine(state, groupId, valueValue, timestampValue);
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, DoubleVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      int groupId = groups.getInt(groupPosition);
+      double valueValue = valueVector.getDouble(valuesPosition);
+      long timestampValue = timestampVector.getLong(valuesPosition);
+      DeltaDoubleAggregator.combine(state, groupId, valueValue, timestampValue);
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    DoubleBlock values = (DoubleBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int groupId = groups.getInt(groupPosition);
+      int valuesPosition = groupPosition + positionOffset;
+      DeltaDoubleAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+    }
+  }
+
+  private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, DoubleBlock valueBlock,
+      LongBlock timestampBlock) {
+    if (valueBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+    if (timestampBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+  }
+
+  @Override
+  public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
+    state.enableGroupIdTracking(seenGroupIds);
+  }
+
+  @Override
+  public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
+    state.toIntermediate(blocks, offset, selected, driverContext);
+  }
+
+  @Override
+  public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
+      GroupingAggregatorEvaluationContext ctx) {
+    blocks[offset] = DeltaDoubleAggregator.evaluateFinal(state, selected, ctx);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getSimpleName()).append("[");
+    sb.append("channels=").append(channels);
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @Override
+  public void close() {
+    state.close();
+  }
+}

+ 46 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaFloatAggregatorFunctionSupplier.java

@@ -0,0 +1,46 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.util.List;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link AggregatorFunctionSupplier} implementation for {@link DeltaFloatAggregator}.
+ * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
+ */
+public final class DeltaFloatAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
+  public DeltaFloatAggregatorFunctionSupplier() {
+  }
+
+  @Override
+  public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
+    return DeltaFloatGroupingAggregatorFunction.intermediateStateDesc();
+  }
+
+  @Override
+  public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public DeltaFloatGroupingAggregatorFunction groupingAggregator(DriverContext driverContext,
+      List<Integer> channels) {
+    return DeltaFloatGroupingAggregatorFunction.create(channels, driverContext);
+  }
+
+  @Override
+  public String describe() {
+    return "delta of floats";
+  }
+}

+ 406 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaFloatGroupingAggregatorFunction.java

@@ -0,0 +1,406 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.lang.StringBuilder;
+import java.util.List;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.FloatBlock;
+import org.elasticsearch.compute.data.FloatVector;
+import org.elasticsearch.compute.data.IntArrayBlock;
+import org.elasticsearch.compute.data.IntBigArrayBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link GroupingAggregatorFunction} implementation for {@link DeltaFloatAggregator}.
+ * This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
+ */
+public final class DeltaFloatGroupingAggregatorFunction implements GroupingAggregatorFunction {
+  private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
+      new IntermediateStateDesc("samples", ElementType.LONG),
+      new IntermediateStateDesc("timestamps", ElementType.LONG),
+      new IntermediateStateDesc("values", ElementType.FLOAT)  );
+
+  private final DeltaFloatAggregator.FloatDeltaGroupingState state;
+
+  private final List<Integer> channels;
+
+  private final DriverContext driverContext;
+
+  public DeltaFloatGroupingAggregatorFunction(List<Integer> channels,
+      DeltaFloatAggregator.FloatDeltaGroupingState state, DriverContext driverContext) {
+    this.channels = channels;
+    this.state = state;
+    this.driverContext = driverContext;
+  }
+
+  public static DeltaFloatGroupingAggregatorFunction create(List<Integer> channels,
+      DriverContext driverContext) {
+    return new DeltaFloatGroupingAggregatorFunction(channels, DeltaFloatAggregator.initGrouping(driverContext), driverContext);
+  }
+
+  public static List<IntermediateStateDesc> intermediateStateDesc() {
+    return INTERMEDIATE_STATE_DESC;
+  }
+
+  @Override
+  public int intermediateBlockCount() {
+    return INTERMEDIATE_STATE_DESC.size();
+  }
+
+  @Override
+  public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds,
+      Page page) {
+    FloatBlock valueBlock = page.getBlock(channels.get(0));
+    LongBlock timestampBlock = page.getBlock(channels.get(1));
+    FloatVector valueVector = valueBlock.asVector();
+    if (valueVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    LongVector timestampVector = timestampBlock.asVector();
+    if (timestampVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    return new GroupingAggregatorFunction.AddInput() {
+      @Override
+      public void add(int positionOffset, IntArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntBigArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntVector groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void close() {
+      }
+    };
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, FloatBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          float valueValue = valueBlock.getFloat(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaFloatAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, FloatVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        float valueValue = valueVector.getFloat(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaFloatAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    FloatBlock values = (FloatBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaFloatAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, FloatBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          float valueValue = valueBlock.getFloat(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaFloatAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, FloatVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        float valueValue = valueVector.getFloat(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaFloatAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    FloatBlock values = (FloatBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaFloatAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, FloatBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupId = groups.getInt(groupPosition);
+      int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+      int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+      for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+        float valueValue = valueBlock.getFloat(valueOffset);
+        int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+        int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+        for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+          long timestampValue = timestampBlock.getLong(timestampOffset);
+          DeltaFloatAggregator.combine(state, groupId, valueValue, timestampValue);
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, FloatVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      int groupId = groups.getInt(groupPosition);
+      float valueValue = valueVector.getFloat(valuesPosition);
+      long timestampValue = timestampVector.getLong(valuesPosition);
+      DeltaFloatAggregator.combine(state, groupId, valueValue, timestampValue);
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    FloatBlock values = (FloatBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int groupId = groups.getInt(groupPosition);
+      int valuesPosition = groupPosition + positionOffset;
+      DeltaFloatAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+    }
+  }
+
+  private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, FloatBlock valueBlock,
+      LongBlock timestampBlock) {
+    if (valueBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+    if (timestampBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+  }
+
+  @Override
+  public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
+    state.enableGroupIdTracking(seenGroupIds);
+  }
+
+  @Override
+  public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
+    state.toIntermediate(blocks, offset, selected, driverContext);
+  }
+
+  @Override
+  public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
+      GroupingAggregatorEvaluationContext ctx) {
+    blocks[offset] = DeltaFloatAggregator.evaluateFinal(state, selected, ctx);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getSimpleName()).append("[");
+    sb.append("channels=").append(channels);
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @Override
+  public void close() {
+    state.close();
+  }
+}

+ 46 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaIntAggregatorFunctionSupplier.java

@@ -0,0 +1,46 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.util.List;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link AggregatorFunctionSupplier} implementation for {@link DeltaIntAggregator}.
+ * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
+ */
+public final class DeltaIntAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
+  public DeltaIntAggregatorFunctionSupplier() {
+  }
+
+  @Override
+  public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
+    return DeltaIntGroupingAggregatorFunction.intermediateStateDesc();
+  }
+
+  @Override
+  public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public DeltaIntGroupingAggregatorFunction groupingAggregator(DriverContext driverContext,
+      List<Integer> channels) {
+    return DeltaIntGroupingAggregatorFunction.create(channels, driverContext);
+  }
+
+  @Override
+  public String describe() {
+    return "delta of ints";
+  }
+}

+ 405 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaIntGroupingAggregatorFunction.java

@@ -0,0 +1,405 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.lang.StringBuilder;
+import java.util.List;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntArrayBlock;
+import org.elasticsearch.compute.data.IntBigArrayBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link GroupingAggregatorFunction} implementation for {@link DeltaIntAggregator}.
+ * This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
+ */
+public final class DeltaIntGroupingAggregatorFunction implements GroupingAggregatorFunction {
+  private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
+      new IntermediateStateDesc("samples", ElementType.LONG),
+      new IntermediateStateDesc("timestamps", ElementType.LONG),
+      new IntermediateStateDesc("values", ElementType.INT)  );
+
+  private final DeltaIntAggregator.IntDeltaGroupingState state;
+
+  private final List<Integer> channels;
+
+  private final DriverContext driverContext;
+
+  public DeltaIntGroupingAggregatorFunction(List<Integer> channels,
+      DeltaIntAggregator.IntDeltaGroupingState state, DriverContext driverContext) {
+    this.channels = channels;
+    this.state = state;
+    this.driverContext = driverContext;
+  }
+
+  public static DeltaIntGroupingAggregatorFunction create(List<Integer> channels,
+      DriverContext driverContext) {
+    return new DeltaIntGroupingAggregatorFunction(channels, DeltaIntAggregator.initGrouping(driverContext), driverContext);
+  }
+
+  public static List<IntermediateStateDesc> intermediateStateDesc() {
+    return INTERMEDIATE_STATE_DESC;
+  }
+
+  @Override
+  public int intermediateBlockCount() {
+    return INTERMEDIATE_STATE_DESC.size();
+  }
+
+  @Override
+  public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds,
+      Page page) {
+    IntBlock valueBlock = page.getBlock(channels.get(0));
+    LongBlock timestampBlock = page.getBlock(channels.get(1));
+    IntVector valueVector = valueBlock.asVector();
+    if (valueVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    LongVector timestampVector = timestampBlock.asVector();
+    if (timestampVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    return new GroupingAggregatorFunction.AddInput() {
+      @Override
+      public void add(int positionOffset, IntArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntBigArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntVector groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void close() {
+      }
+    };
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, IntBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          int valueValue = valueBlock.getInt(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaIntAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, IntVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueValue = valueVector.getInt(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaIntAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    IntBlock values = (IntBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaIntAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, IntBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          int valueValue = valueBlock.getInt(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaIntAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, IntVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueValue = valueVector.getInt(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaIntAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    IntBlock values = (IntBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaIntAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, IntBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupId = groups.getInt(groupPosition);
+      int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+      int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+      for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+        int valueValue = valueBlock.getInt(valueOffset);
+        int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+        int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+        for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+          long timestampValue = timestampBlock.getLong(timestampOffset);
+          DeltaIntAggregator.combine(state, groupId, valueValue, timestampValue);
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, IntVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      int groupId = groups.getInt(groupPosition);
+      int valueValue = valueVector.getInt(valuesPosition);
+      long timestampValue = timestampVector.getLong(valuesPosition);
+      DeltaIntAggregator.combine(state, groupId, valueValue, timestampValue);
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    IntBlock values = (IntBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int groupId = groups.getInt(groupPosition);
+      int valuesPosition = groupPosition + positionOffset;
+      DeltaIntAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+    }
+  }
+
+  private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, IntBlock valueBlock,
+      LongBlock timestampBlock) {
+    if (valueBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+    if (timestampBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+  }
+
+  @Override
+  public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
+    state.enableGroupIdTracking(seenGroupIds);
+  }
+
+  @Override
+  public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
+    state.toIntermediate(blocks, offset, selected, driverContext);
+  }
+
+  @Override
+  public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
+      GroupingAggregatorEvaluationContext ctx) {
+    blocks[offset] = DeltaIntAggregator.evaluateFinal(state, selected, ctx);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getSimpleName()).append("[");
+    sb.append("channels=").append(channels);
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @Override
+  public void close() {
+    state.close();
+  }
+}

+ 46 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaLongAggregatorFunctionSupplier.java

@@ -0,0 +1,46 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.util.List;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link AggregatorFunctionSupplier} implementation for {@link DeltaLongAggregator}.
+ * This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
+ */
+public final class DeltaLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
+  public DeltaLongAggregatorFunctionSupplier() {
+  }
+
+  @Override
+  public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
+    return DeltaLongGroupingAggregatorFunction.intermediateStateDesc();
+  }
+
+  @Override
+  public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
+    throw new UnsupportedOperationException("non-grouping aggregator is not supported");
+  }
+
+  @Override
+  public DeltaLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext,
+      List<Integer> channels) {
+    return DeltaLongGroupingAggregatorFunction.create(channels, driverContext);
+  }
+
+  @Override
+  public String describe() {
+    return "delta of longs";
+  }
+}

+ 404 - 0
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/DeltaLongGroupingAggregatorFunction.java

@@ -0,0 +1,404 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.compute.aggregation;
+
+import java.lang.Integer;
+import java.lang.Override;
+import java.lang.String;
+import java.lang.StringBuilder;
+import java.util.List;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntArrayBlock;
+import org.elasticsearch.compute.data.IntBigArrayBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+
+/**
+ * {@link GroupingAggregatorFunction} implementation for {@link DeltaLongAggregator}.
+ * This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
+ */
+public final class DeltaLongGroupingAggregatorFunction implements GroupingAggregatorFunction {
+  private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
+      new IntermediateStateDesc("samples", ElementType.LONG),
+      new IntermediateStateDesc("timestamps", ElementType.LONG),
+      new IntermediateStateDesc("values", ElementType.LONG)  );
+
+  private final DeltaLongAggregator.LongDeltaGroupingState state;
+
+  private final List<Integer> channels;
+
+  private final DriverContext driverContext;
+
+  public DeltaLongGroupingAggregatorFunction(List<Integer> channels,
+      DeltaLongAggregator.LongDeltaGroupingState state, DriverContext driverContext) {
+    this.channels = channels;
+    this.state = state;
+    this.driverContext = driverContext;
+  }
+
+  public static DeltaLongGroupingAggregatorFunction create(List<Integer> channels,
+      DriverContext driverContext) {
+    return new DeltaLongGroupingAggregatorFunction(channels, DeltaLongAggregator.initGrouping(driverContext), driverContext);
+  }
+
+  public static List<IntermediateStateDesc> intermediateStateDesc() {
+    return INTERMEDIATE_STATE_DESC;
+  }
+
+  @Override
+  public int intermediateBlockCount() {
+    return INTERMEDIATE_STATE_DESC.size();
+  }
+
+  @Override
+  public GroupingAggregatorFunction.AddInput prepareProcessRawInputPage(SeenGroupIds seenGroupIds,
+      Page page) {
+    LongBlock valueBlock = page.getBlock(channels.get(0));
+    LongBlock timestampBlock = page.getBlock(channels.get(1));
+    LongVector valueVector = valueBlock.asVector();
+    if (valueVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    LongVector timestampVector = timestampBlock.asVector();
+    if (timestampVector == null) {
+      maybeEnableGroupIdTracking(seenGroupIds, valueBlock, timestampBlock);
+      return new GroupingAggregatorFunction.AddInput() {
+        @Override
+        public void add(int positionOffset, IntArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntBigArrayBlock groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void add(int positionOffset, IntVector groupIds) {
+          addRawInput(positionOffset, groupIds, valueBlock, timestampBlock);
+        }
+
+        @Override
+        public void close() {
+        }
+      };
+    }
+    return new GroupingAggregatorFunction.AddInput() {
+      @Override
+      public void add(int positionOffset, IntArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntBigArrayBlock groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void add(int positionOffset, IntVector groupIds) {
+        addRawInput(positionOffset, groupIds, valueVector, timestampVector);
+      }
+
+      @Override
+      public void close() {
+      }
+    };
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, LongBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          long valueValue = valueBlock.getLong(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaLongAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntArrayBlock groups, LongVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        long valueValue = valueVector.getLong(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaLongAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock values = (LongBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaLongAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, LongBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+        int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+        for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+          long valueValue = valueBlock.getLong(valueOffset);
+          int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+          int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+          for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+            long timestampValue = timestampBlock.getLong(timestampOffset);
+            DeltaLongAggregator.combine(state, groupId, valueValue, timestampValue);
+          }
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntBigArrayBlock groups, LongVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int valuesPosition = groupPosition + positionOffset;
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        long valueValue = valueVector.getLong(valuesPosition);
+        long timestampValue = timestampVector.getLong(valuesPosition);
+        DeltaLongAggregator.combine(state, groupId, valueValue, timestampValue);
+      }
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntBigArrayBlock groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock values = (LongBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      if (groups.isNull(groupPosition)) {
+        continue;
+      }
+      int groupStart = groups.getFirstValueIndex(groupPosition);
+      int groupEnd = groupStart + groups.getValueCount(groupPosition);
+      for (int g = groupStart; g < groupEnd; g++) {
+        int groupId = groups.getInt(g);
+        int valuesPosition = groupPosition + positionOffset;
+        DeltaLongAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, LongBlock valueBlock,
+      LongBlock timestampBlock) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      if (valueBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      if (timestampBlock.isNull(valuesPosition)) {
+        continue;
+      }
+      int groupId = groups.getInt(groupPosition);
+      int valueStart = valueBlock.getFirstValueIndex(valuesPosition);
+      int valueEnd = valueStart + valueBlock.getValueCount(valuesPosition);
+      for (int valueOffset = valueStart; valueOffset < valueEnd; valueOffset++) {
+        long valueValue = valueBlock.getLong(valueOffset);
+        int timestampStart = timestampBlock.getFirstValueIndex(valuesPosition);
+        int timestampEnd = timestampStart + timestampBlock.getValueCount(valuesPosition);
+        for (int timestampOffset = timestampStart; timestampOffset < timestampEnd; timestampOffset++) {
+          long timestampValue = timestampBlock.getLong(timestampOffset);
+          DeltaLongAggregator.combine(state, groupId, valueValue, timestampValue);
+        }
+      }
+    }
+  }
+
+  private void addRawInput(int positionOffset, IntVector groups, LongVector valueVector,
+      LongVector timestampVector) {
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int valuesPosition = groupPosition + positionOffset;
+      int groupId = groups.getInt(groupPosition);
+      long valueValue = valueVector.getLong(valuesPosition);
+      long timestampValue = timestampVector.getLong(valuesPosition);
+      DeltaLongAggregator.combine(state, groupId, valueValue, timestampValue);
+    }
+  }
+
+  @Override
+  public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
+    state.enableGroupIdTracking(new SeenGroupIds.Empty());
+    assert channels.size() == intermediateBlockCount();
+    Block samplesUncast = page.getBlock(channels.get(0));
+    if (samplesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock samples = (LongBlock) samplesUncast;
+    Block timestampsUncast = page.getBlock(channels.get(1));
+    if (timestampsUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock timestamps = (LongBlock) timestampsUncast;
+    Block valuesUncast = page.getBlock(channels.get(2));
+    if (valuesUncast.areAllValuesNull()) {
+      return;
+    }
+    LongBlock values = (LongBlock) valuesUncast;
+    assert samples.getPositionCount() == timestamps.getPositionCount() && samples.getPositionCount() == values.getPositionCount();
+    for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
+      int groupId = groups.getInt(groupPosition);
+      int valuesPosition = groupPosition + positionOffset;
+      DeltaLongAggregator.combineIntermediate(state, groupId, samples, timestamps, values, valuesPosition);
+    }
+  }
+
+  private void maybeEnableGroupIdTracking(SeenGroupIds seenGroupIds, LongBlock valueBlock,
+      LongBlock timestampBlock) {
+    if (valueBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+    if (timestampBlock.mayHaveNulls()) {
+      state.enableGroupIdTracking(seenGroupIds);
+    }
+  }
+
+  @Override
+  public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
+    state.enableGroupIdTracking(seenGroupIds);
+  }
+
+  @Override
+  public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
+    state.toIntermediate(blocks, offset, selected, driverContext);
+  }
+
+  @Override
+  public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
+      GroupingAggregatorEvaluationContext ctx) {
+    blocks[offset] = DeltaLongAggregator.evaluateFinal(state, selected, ctx);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getSimpleName()).append("[");
+    sb.append("channels=").append(channels);
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @Override
+  public void close() {
+    state.close();
+  }
+}

+ 250 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-DeltaAggregator.java.st

@@ -0,0 +1,250 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.aggregation;
+
+// begin generated imports
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ObjectArray;
+import org.elasticsearch.compute.ann.GroupingAggregator;
+import org.elasticsearch.compute.ann.IntermediateState;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.FloatBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+// end generated imports
+
+/**
+ * A rate grouping aggregation definition for $type$. This implementation supports the `Delta` and `idelta` functions.
+ * This class is generated. Edit `X-DeltaAggregator.java.st` instead.
+ */
+@GroupingAggregator(
+    value = {
+        @IntermediateState(name = "samples", type = "LONG_BLOCK"),
+        @IntermediateState(name = "timestamps", type = "LONG_BLOCK"),
+        @IntermediateState(name = "values", type = "$TYPE$_BLOCK") }
+)
+public class Delta$Type$Aggregator {
+    public static $Type$DeltaGroupingState initGrouping(DriverContext driverContext) {
+        return new $Type$DeltaGroupingState(driverContext.bigArrays(), driverContext.breaker());
+    }
+
+    public static void combine($Type$DeltaGroupingState current, int groupId, $type$ value, long timestamp) {
+        current.ensureCapacity(groupId);
+        current.append(groupId, timestamp, value);
+    }
+
+    public static void combineIntermediate(
+        $Type$DeltaGroupingState current,
+        int groupId,
+        LongBlock samples,
+        LongBlock timestamps,
+        $Type$Block values,
+        int otherPosition
+    ) {
+        current.combine(groupId, samples, timestamps, values, otherPosition);
+    }
+
+    public static Block evaluateFinal($Type$DeltaGroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+        return state.evaluateFinal(selected, evalContext);
+    }
+
+    private static class $Type$DeltaState {
+        static final long BASE_RAM_USAGE = RamUsageEstimator.sizeOfObject($Type$DeltaState.class);
+        long lastTimestamp = -1;
+        long firstTimestamp = Long.MAX_VALUE;
+        $type$ lastValue;
+        $type$ firstValue;
+        long valuesSeen;
+
+        $Type$DeltaState(long seenTs, $type$ seenValue) {
+            this.lastTimestamp = seenTs;
+            this.lastValue = seenValue;
+            this.firstTimestamp = seenTs;
+            this.firstValue = seenValue;
+            this.valuesSeen = 1L;
+        }
+
+        long bytesUsed() {
+            return BASE_RAM_USAGE;
+        }
+    }
+
+    public static final class $Type$DeltaGroupingState implements Releasable, Accountable, GroupingAggregatorState {
+        private ObjectArray<$Type$DeltaState> states;
+        private final BigArrays bigArrays;
+        private final CircuitBreaker breaker;
+        private long stateBytes; // for individual states
+
+        $Type$DeltaGroupingState(BigArrays bigArrays, CircuitBreaker breaker) {
+            this.bigArrays = bigArrays;
+            this.breaker = breaker;
+            this.states = bigArrays.newObjectArray(1);
+        }
+
+        void ensureCapacity(int groupId) {
+            states = bigArrays.grow(states, groupId + 1);
+        }
+
+        void adjustBreaker(long bytes) {
+            breaker.addEstimateBytesAndMaybeBreak(bytes, "<<delta aggregation>>");
+            stateBytes += bytes;
+            assert stateBytes >= 0 : stateBytes;
+        }
+
+        void append(int groupId, long timestamp, $type$ value) {
+            var state = states.get(groupId);
+            if (state == null) {
+                state = new $Type$DeltaState(timestamp, value);
+                states.set(groupId, state);
+                adjustBreaker(state.bytesUsed());
+            } else {
+                if (timestamp >= state.lastTimestamp) {
+                    state.lastTimestamp = timestamp;
+                    state.lastValue = value;
+                    state.valuesSeen++;
+                } else if (timestamp <= state.firstTimestamp) {
+                    state.firstTimestamp = timestamp;
+                    state.firstValue = value;
+                    state.valuesSeen++;
+                } // else: ignore, too old
+            }
+        }
+
+        void combine(int groupId, LongBlock samples, LongBlock timestamps, $Type$Block values, int otherPosition) {
+            final int valueCount = timestamps.getValueCount(otherPosition);
+            if (valueCount == 0) {
+                return;
+            }
+            final long valuesSeen = samples.getLong(samples.getFirstValueIndex(otherPosition));
+            final int firstTs = timestamps.getFirstValueIndex(otherPosition);
+            final int firstIndex = values.getFirstValueIndex(otherPosition);
+            ensureCapacity(groupId);
+            append(groupId, timestamps.getLong(firstTs), values.get$Type$(firstIndex));
+            if (valueCount > 1) {
+                ensureCapacity(groupId);
+                append(groupId, timestamps.getLong(firstTs + 1), values.get$Type$(firstIndex + 1));
+            }
+            // We are merging the state from upstream, which means we have seen
+            // `valuesSeen` values, but we have already counted one or two of them,
+            // which is represented by `valueCount - 1`.
+            states.get(groupId).valuesSeen += valuesSeen - valueCount;
+        }
+
+        @Override
+        public long ramBytesUsed() {
+            return states.ramBytesUsed() + stateBytes;
+        }
+
+        @Override
+        public void close() {
+            Releasables.close(states, () -> adjustBreaker(-stateBytes));
+        }
+
+        @Override
+        public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
+            assert blocks.length >= offset + 2 : "blocks=" + blocks.length + ",offset=" + offset;
+            final BlockFactory blockFactory = driverContext.blockFactory();
+            final int positionCount = selected.getPositionCount();
+            try (
+                LongBlock.Builder samples = blockFactory.newLongBlockBuilder(positionCount);
+                LongBlock.Builder timestamps = blockFactory.newLongBlockBuilder(positionCount * 2);
+                $Type$Block.Builder values = blockFactory.new$Type$BlockBuilder(positionCount * 2);
+            ) {
+                for (int i = 0; i < positionCount; i++) {
+                    final var groupId = selected.getInt(i);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state != null) {
+                        samples.beginPositionEntry();
+                        samples.appendLong(state.valuesSeen);
+                        samples.endPositionEntry();
+                        timestamps.beginPositionEntry();
+                        timestamps.appendLong(state.lastTimestamp);
+                        if (state.valuesSeen > 1) {
+                            timestamps.appendLong(state.firstTimestamp);
+                        }
+                        timestamps.endPositionEntry();
+
+                        values.beginPositionEntry();
+                        values.append$Type$(state.lastValue);
+                        if (state.valuesSeen > 1) {
+                            values.append$Type$(state.firstValue);
+                        }
+                        values.endPositionEntry();
+                    } else {
+                        samples.appendLong(0L);
+                        timestamps.appendNull();
+                        values.appendNull();
+                    }
+                }
+                blocks[offset] = samples.build();
+                blocks[offset + 1] = timestamps.build();
+                blocks[offset + 2] = values.build();
+            }
+        }
+
+        Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
+            int positionCount = selected.getPositionCount();
+            try (DoubleBlock.Builder rates = evalContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
+                for (int p = 0; p < positionCount; p++) {
+                    final var groupId = selected.getInt(p);
+                    final var state = groupId < states.size() ? states.get(groupId) : null;
+                    if (state == null || state.valuesSeen < 2) {
+                        rates.appendNull();
+                        continue;
+                    }
+                    if (evalContext instanceof TimeSeriesGroupingAggregatorEvaluationContext tsContext) {
+                        // At this point we want to apply extrapolation
+                        var rangeStart = tsContext.rangeStartInMillis(groupId);
+                        var rangeEnd = tsContext.rangeEndInMillis(groupId);
+                        if (state.lastTimestamp - state.firstTimestamp == 0) {
+                            rates.appendNull();
+                            continue;
+                        }
+                        double startGap = state.firstTimestamp - rangeStart;
+                        final double averageSampleInterval = (state.lastTimestamp - state.firstTimestamp) / state.valuesSeen;
+                        final double slope = (state.lastValue - state.firstValue) / (state.lastTimestamp - state.firstTimestamp);
+                        double endGap = rangeEnd - state.lastTimestamp;
+                        double calculatedFirstValue = state.firstValue;
+                        if (startGap > 0) {
+                            if (startGap > averageSampleInterval * 1.1) {
+                                startGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedFirstValue = calculatedFirstValue - startGap * slope;
+                        }
+                        double calculatedLastValue = state.lastValue;
+                        if (endGap > 0) {
+                            if (endGap > averageSampleInterval * 1.1) {
+                                endGap = averageSampleInterval / 2.0;
+                            }
+                            calculatedLastValue = calculatedLastValue + endGap * slope;
+                        }
+                        rates.appendDouble(calculatedLastValue - calculatedFirstValue);
+                    } else {
+                        rates.appendDouble(state.lastValue - state.firstValue);
+                    }
+                }
+                return rates.build();
+            }
+        }
+
+        @Override
+        public void enableGroupIdTracking(SeenGroupIds seenGroupIds) {
+            // noop - we handle the null states inside `toIntermediate` and `evaluateFinal`
+        }
+    }
+}

+ 171 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-delta.csv-spec

@@ -0,0 +1,171 @@
+delta_of_double_no_grouping
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+TS k8s
+| STATS cost_change=sum(delta(network.cost)) BY time_bucket = bucket(@timestamp,1minute)
+| SORT cost_change DESC, time_bucket DESC | LIMIT 10;
+
+cost_change:double | time_bucket:datetime
+null               | 2024-05-10T00:01:00.000Z
+22.5701486013986   | 2024-05-10T00:20:00.000Z
+13.259615384615385 | 2024-05-10T00:13:00.000Z
+11.486895161290322 | 2024-05-10T00:12:00.000Z
+11.066666666666666 | 2024-05-10T00:21:00.000Z
+11.0625            | 2024-05-10T00:22:00.000Z
+9.222489316239315  | 2024-05-10T00:09:00.000Z
+6.991071428571427  | 2024-05-10T00:15:00.000Z
+5.833333333333333  | 2024-05-10T00:10:00.000Z
+5.03125            | 2024-05-10T00:00:00.000Z
+
+;
+
+delta_of_integer
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+required_capability: k8s_dataset_additional_fields
+TS k8s | STATS clients = avg(delta(network.eth0.currently_connected_clients)) BY time_bucket = bucket(@timestamp,1minute) | SORT time_bucket | LIMIT 10;
+
+clients:double      | time_bucket:datetime
+-520.0              | 2024-05-10T00:00:00.000Z
+null                | 2024-05-10T00:01:00.000Z
+-248.5              | 2024-05-10T00:02:00.000Z
+-179.0              | 2024-05-10T00:03:00.000Z
+289.0               | 2024-05-10T00:04:00.000Z
+-436.0              | 2024-05-10T00:05:00.000Z
+117.25              | 2024-05-10T00:06:00.000Z
+-214.0              | 2024-05-10T00:07:00.000Z
+-80.66666666666667  | 2024-05-10T00:08:00.000Z
+-46.714285714285715 | 2024-05-10T00:09:00.000Z
+
+;
+
+delta_of_integer_grouping
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+required_capability: k8s_dataset_additional_fields
+TS k8s | STATS clients = avg(delta(network.eth0.currently_connected_clients)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT time_bucket, cluster | LIMIT 10;
+
+clients:double | cluster:keyword | time_bucket:datetime
+-520.0         | prod            | 2024-05-10T00:00:00.000Z
+null           | staging         | 2024-05-10T00:00:00.000Z
+null           | prod            | 2024-05-10T00:01:00.000Z
+null           | qa              | 2024-05-10T00:01:00.000Z
+-54.0          | prod            | 2024-05-10T00:02:00.000Z
+null           | qa              | 2024-05-10T00:02:00.000Z
+-443.0         | staging         | 2024-05-10T00:02:00.000Z
+null           | prod            | 2024-05-10T00:03:00.000Z
+93.0           | qa              | 2024-05-10T00:03:00.000Z
+-315.0         | staging         | 2024-05-10T00:03:00.000Z
+
+;
+
+delta_with_filtering
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+TS k8s | WHERE pod == "one" | STATS tx = sum(delta(network.bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, cluster | LIMIT 10;
+
+tx:double | cluster:keyword | time_bucket:datetime
+-351.0    | prod            | 2024-05-10T00:00:00.000Z
+552.0     | qa              | 2024-05-10T00:00:00.000Z
+127.0     | staging         | 2024-05-10T00:00:00.000Z
+280.0     | prod            | 2024-05-10T00:10:00.000Z
+73.0      | qa              | 2024-05-10T00:10:00.000Z
+-600.0    | staging         | 2024-05-10T00:10:00.000Z
+-22.0     | prod            | 2024-05-10T00:20:00.000Z
+-711.0    | qa              | 2024-05-10T00:20:00.000Z
+-511.0    | staging         | 2024-05-10T00:20:00.000Z
+;
+
+delta_with_inline_filtering
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+TS k8s | STATS tx = sum(delta(network.bytes_in)) WHERE pod == "one"  BY cluster, time_bucket = bucket(@timestamp, 10minute) | SORT time_bucket, cluster | LIMIT 10;
+
+tx:double | cluster:keyword | time_bucket:datetime
+-351.0    | prod            | 2024-05-10T00:00:00.000Z
+552.0     | qa              | 2024-05-10T00:00:00.000Z
+127.0     | staging         | 2024-05-10T00:00:00.000Z
+280.0     | prod            | 2024-05-10T00:10:00.000Z
+73.0      | qa              | 2024-05-10T00:10:00.000Z
+-600.0    | staging         | 2024-05-10T00:10:00.000Z
+-22.0     | prod            | 2024-05-10T00:20:00.000Z
+-711.0    | qa              | 2024-05-10T00:20:00.000Z
+-511.0    | staging         | 2024-05-10T00:20:00.000Z
+;
+
+eval_on_delta
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+TS k8s | STATS avg_bytes = avg(delta(network.bytes_in)) BY cluster, time_bucket = bucket(@timestamp, 10minute) | EVAL kb_minus_offset = (avg_bytes - 100) / 1000.0 | LIMIT 10 | SORT time_bucket, cluster ;
+
+avg_bytes:double    | cluster:keyword | time_bucket:datetime     | kb_minus_offset:double
+-199.66666666666666 | prod            | 2024-05-10T00:00:00.000Z | -0.29966666666666664
+-68.33333333333333  | qa              | 2024-05-10T00:00:00.000Z | -0.1683333333333333
+-26.666666666666668 | staging         | 2024-05-10T00:00:00.000Z | -0.12666666666666668
+140.33333333333334  | prod            | 2024-05-10T00:10:00.000Z | 0.040333333333333346
+322.3333333333333   | qa              | 2024-05-10T00:10:00.000Z | 0.22233333333333333
+196.33333333333334  | staging         | 2024-05-10T00:10:00.000Z | 0.09633333333333334
+56.0                | prod            | 2024-05-10T00:20:00.000Z | -0.044
+-367.0              | qa              | 2024-05-10T00:20:00.000Z | -0.467
+-505.0              | staging         | 2024-05-10T00:20:00.000Z | -0.605
+;
+
+delta_multi_values
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+required_capability: k8s_dataset_additional_fields
+TS k8s | WHERE @timestamp < "2024-05-10T00:10:00.000Z" | STATS events = sum(delta(events_received)) by pod, time_bucket = bucket(@timestamp, 1minute) | SORT events desc, pod, time_bucket  | LIMIT 10;
+
+events:double | pod:keyword | time_bucket:datetime
+null          | one         | 2024-05-10T00:01:00.000Z
+null          | one         | 2024-05-10T00:04:00.000Z
+null          | three       | 2024-05-10T00:01:00.000Z
+null          | three       | 2024-05-10T00:02:00.000Z
+null          | three       | 2024-05-10T00:05:00.000Z
+null          | three       | 2024-05-10T00:08:00.000Z
+null          | two         | 2024-05-10T00:00:00.000Z
+null          | two         | 2024-05-10T00:01:00.000Z
+null          | two         | 2024-05-10T00:07:00.000Z
+9.0           | one         | 2024-05-10T00:03:00.000Z
+
+;
+
+delta_null_values
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+required_capability: k8s_dataset_additional_fields
+TS k8s | WHERE @timestamp > "2024-05-10T00:10:00.000Z" and @timestamp < "2024-05-10T00:15:00.000Z" | STATS events = sum(delta(events_received)) by pod, time_bucket = bucket(@timestamp, 1minute) | SORT events desc, pod, time_bucket  | LIMIT 10;
+
+events:double | pod:keyword | time_bucket:datetime
+null          | one         | 2024-05-10T00:10:00.000Z
+null          | one         | 2024-05-10T00:11:00.000Z
+null          | one         | 2024-05-10T00:12:00.000Z
+null          | one         | 2024-05-10T00:14:00.000Z
+null          | three       | 2024-05-10T00:11:00.000Z
+null          | three       | 2024-05-10T00:12:00.000Z
+null          | three       | 2024-05-10T00:14:00.000Z
+null          | two         | 2024-05-10T00:10:00.000Z
+null          | two         | 2024-05-10T00:11:00.000Z
+null          | two         | 2024-05-10T00:13:00.000Z
+
+
+;
+
+delta_all_value_types
+required_capability: ts_command_v0
+required_capability: delta_ts_agg
+required_capability: k8s_dataset_additional_fields
+TS k8s | STATS events = sum(delta(events_received)) by pod, time_bucket = bucket(@timestamp, 10minute) | SORT events desc, pod, time_bucket | LIMIT 10 ;
+
+events:double | pod:keyword | time_bucket:datetime
+18.0          | three       | 2024-05-10T00:10:00.000Z
+7.0           | one         | 2024-05-10T00:00:00.000Z
+2.0           | two         | 2024-05-10T00:10:00.000Z
+0.0           | one         | 2024-05-10T00:10:00.000Z
+-1.0          | three       | 2024-05-10T00:20:00.000Z
+-1.0          | two         | 2024-05-10T00:20:00.000Z
+-4.0          | one         | 2024-05-10T00:20:00.000Z
+-4.0          | three       | 2024-05-10T00:00:00.000Z
+-4.0          | two         | 2024-05-10T00:00:00.000Z
+
+;

+ 21 - 5
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RandomizedTimeSeriesIT.java

@@ -75,7 +75,8 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
         Tuple.tuple("rate", DeltaAgg.RATE),
         Tuple.tuple("irate", DeltaAgg.IRATE),
         Tuple.tuple("increase", DeltaAgg.INCREASE),
-        Tuple.tuple("idelta", DeltaAgg.IDELTA)
+        Tuple.tuple("idelta", DeltaAgg.IDELTA),
+        Tuple.tuple("delta", DeltaAgg.DELTA)
     );
     private static final Map<DeltaAgg, String> DELTA_AGG_METRIC_MAP = Map.of(
         DeltaAgg.RATE,
@@ -85,7 +86,9 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
         DeltaAgg.IDELTA,
         "gaugel_hdd.bytes.used",
         DeltaAgg.INCREASE,
-        "counterl_hdd.bytes.read"
+        "counterl_hdd.bytes.read",
+        DeltaAgg.DELTA,
+        "gaugel_hdd.bytes.used"
     );
 
     private List<XContentBuilder> documents;
@@ -276,7 +279,8 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
         RATE,
         IRATE,
         IDELTA,
-        INCREASE
+        INCREASE,
+        DELTA
     }
 
     // A record that holds min, max, avg, count and sum of rates calculated from a timeseries.
@@ -295,6 +299,7 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
             timeseries.sort((t1, t2) -> t1.v2().v1().compareTo(t2.v2().v1()));
             var firstTs = timeseries.getFirst().v2().v1();
             var lastTs = timeseries.getLast().v2().v1();
+            var tsDurationSeconds = (lastTs.toEpochMilli() - firstTs.toEpochMilli()) / 1000.0;
             if (deltaAgg.equals(DeltaAgg.IRATE)) {
                 var lastVal = timeseries.getLast().v2().v2();
                 var secondLastVal = timeseries.get(timeseries.size() - 2).v2().v2();
@@ -302,6 +307,17 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
                     timeseries.size() - 2
                 ).v2().v1().toEpochMilli()) * 1000;
                 return new RateRange(irate * 0.999, irate * 1.001); // Add 0.1% tolerance
+            } else if (deltaAgg.equals(DeltaAgg.DELTA)) {
+                var firstVal = timeseries.getFirst().v2().v2();
+                var lastVal = timeseries.getLast().v2().v2();
+                var delta = lastVal - firstVal;
+                // We must extrapolate the delta to the window size
+                var windowSizeFactor = secondsInWindow / tsDurationSeconds;
+                if (delta < 0) {
+                    return new RateRange(delta * windowSizeFactor * 1.001, delta * 0.999); // Add 0.1% tolerance
+                } else {
+                    return new RateRange(delta * 0.999, delta * windowSizeFactor * 1.001); // Add 0.1% tolerance
+                }
             } else if (deltaAgg.equals(DeltaAgg.IDELTA)) {
                 var lastVal = timeseries.getLast().v2().v2();
                 var secondLastVal = timeseries.get(timeseries.size() - 2).v2().v2();
@@ -332,7 +348,6 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
                 }
                 lastValue = currentValue; // Update last value for next iteration
             }
-            var tsDurationSeconds = (lastTs.toEpochMilli() - firstTs.toEpochMilli()) / 1000.0;
             if (deltaAgg.equals(DeltaAgg.INCREASE)) {
                 return new RateRange(
                     counterGrowth * 0.99, // INCREASE is RATE multiplied by the window size
@@ -432,7 +447,8 @@ public class RandomizedTimeSeriesIT extends AbstractEsqlIntegTestCase {
      * the same values from the documents in the group.
      */
     public void testRateGroupBySubset() {
-        var deltaAgg = ESTestCase.randomFrom(DELTA_AGG_OPTIONS);
+        // var deltaAgg = ESTestCase.randomFrom(DELTA_AGG_OPTIONS);
+        var deltaAgg = Tuple.tuple("delta", DeltaAgg.DELTA); // TODO: Enable random selection after fixing
         var metricName = DELTA_AGG_METRIC_MAP.get(deltaAgg.v2());
         var window = ESTestCase.randomFrom(WINDOW_OPTIONS);
         var windowSize = window.v2();

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -1206,9 +1206,10 @@ public class EsqlCapabilities {
         COUNT_DISTINCT_OVER_TIME(Build.current().isSnapshot()),
 
         /**
-         * Support for INCREASE timeseries aggregation.
+         * Support for INCREASE, DELTA timeseries aggregations.
          */
         INCREASE,
+        DELTA_TS_AGG,
 
         /**
          * Extra field types in the k8s.csv dataset

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

@@ -27,6 +27,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinctOverTime;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Delta;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.First;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Idelta;
@@ -510,6 +511,7 @@ public class EsqlFunctionRegistry {
                 def(Rate.class, uni(Rate::new), "rate"),
                 def(Irate.class, uni(Irate::new), "irate"),
                 def(Idelta.class, uni(Idelta::new), "idelta"),
+                def(Delta.class, uni(Delta::new), "delta"),
                 def(Increase.class, uni(Increase::new), "increase"),
                 def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
                 def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java

@@ -28,6 +28,7 @@ public class AggregateWritables {
             Irate.ENTRY,
             Idelta.ENTRY,
             Increase.ENTRY,
+            Delta.ENTRY,
             Sample.ENTRY,
             SpatialCentroid.ENTRY,
             SpatialExtent.ENTRY,

+ 142 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Delta.java

@@ -0,0 +1,142 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.aggregate;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.DeltaDoubleAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.DeltaIntAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.DeltaLongAggregatorFunctionSupplier;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo;
+import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle;
+import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
+import org.elasticsearch.xpack.esql.expression.function.FunctionType;
+import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
+import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.planner.ToAggregator;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
+import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
+
+public class Delta extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator {
+    public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Delta", Delta::new);
+
+    private final Expression timestamp;
+
+    @FunctionInfo(
+        type = FunctionType.TIME_SERIES_AGGREGATE,
+        returnType = { "double" },
+        description = "The absolute change of a gauge field in a time window.",
+        appliesTo = { @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE) },
+        note = "Available with the [TS](/reference/query-languages/esql/commands/source-commands.md#esql-ts) command in snapshot builds"
+    )
+    public Delta(Source source, @Param(name = "field", type = { "long", "integer", "double" }) Expression field) {
+        this(source, field, new UnresolvedAttribute(source, "@timestamp"));
+    }
+
+    public Delta(Source source, @Param(name = "field", type = { "long", "integer", "double" }) Expression field, Expression timestamp) {
+        this(source, field, Literal.TRUE, timestamp);
+    }
+
+    // compatibility constructor used when reading from the stream
+    private Delta(Source source, Expression field, Expression filter, List<Expression> children) {
+        this(source, field, filter, children.getFirst());
+    }
+
+    private Delta(Source source, Expression field, Expression filter, Expression timestamp) {
+        super(source, field, filter, List.of(timestamp));
+        this.timestamp = timestamp;
+    }
+
+    public Delta(StreamInput in) throws IOException {
+        this(
+            Source.readFrom((PlanStreamInput) in),
+            in.readNamedWriteable(Expression.class),
+            in.readNamedWriteable(Expression.class),
+            in.readNamedWriteableCollectionAsList(Expression.class)
+        );
+    }
+
+    @Override
+    public String getWriteableName() {
+        return ENTRY.name;
+    }
+
+    @Override
+    protected NodeInfo<Delta> info() {
+        return NodeInfo.create(this, Delta::new, field(), timestamp);
+    }
+
+    @Override
+    public Delta replaceChildren(List<Expression> newChildren) {
+        if (newChildren.size() != 3) {
+            assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren;
+            throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren);
+        }
+        return new Delta(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
+    }
+
+    @Override
+    public Delta withFilter(Expression filter) {
+        return new Delta(source(), field(), filter, timestamp);
+    }
+
+    @Override
+    public DataType dataType() {
+        return DataType.DOUBLE;
+    }
+
+    @Override
+    protected TypeResolution resolveType() {
+        return isType(
+            field(),
+            dt -> dt.isNumeric() && dt != AGGREGATE_METRIC_DOUBLE,
+            sourceText(),
+            DEFAULT,
+            "numeric except counter types"
+        );
+    }
+
+    @Override
+    public AggregatorFunctionSupplier supplier() {
+        final DataType type = field().dataType();
+        return switch (type) {
+            case LONG -> new DeltaLongAggregatorFunctionSupplier();
+            case INTEGER -> new DeltaIntAggregatorFunctionSupplier();
+            case DOUBLE -> new DeltaDoubleAggregatorFunctionSupplier();
+            default -> throw EsqlIllegalArgumentException.illegalDataType(type);
+        };
+    }
+
+    @Override
+    public Delta perTimeSeriesAggregation() {
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "delta(" + field() + ")";
+    }
+
+    Expression timestamp() {
+        return timestamp;
+    }
+}