Browse Source

Speedup lossy sum with constant group (#133779)

If the group is constant, we can compute the sum in a tight loop 
and update the group's value only once.
Nhat Nguyen 1 month ago
parent
commit
d3184caefe

+ 2 - 1
x-pack/plugin/esql/compute/src/main/generated/org/elasticsearch/compute/aggregation/LossySumDoubleGroupingAggregatorFunction.java

@@ -86,7 +86,7 @@ public final class LossySumDoubleGroupingAggregatorFunction implements GroupingA
         }
       };
     }
-    return new GroupingAggregatorFunction.AddInput() {
+    var addInput = new GroupingAggregatorFunction.AddInput() {
       @Override
       public void add(int positionOffset, IntArrayBlock groupIds) {
         addRawInput(positionOffset, groupIds, vVector);
@@ -106,6 +106,7 @@ public final class LossySumDoubleGroupingAggregatorFunction implements GroupingA
       public void close() {
       }
     };
+    return LossySumDoubleAggregator.wrapAddInput(addInput, state, vVector);
   }
 
   private void addRawInput(int positionOffset, IntArrayBlock groups, DoubleBlock vBlock) {

+ 41 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/LossySumDoubleAggregator.java

@@ -15,6 +15,9 @@ import org.elasticsearch.compute.ann.IntermediateState;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.DoubleVector;
+import org.elasticsearch.compute.data.IntArrayBlock;
+import org.elasticsearch.compute.data.IntBigArrayBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.core.Releasables;
@@ -121,6 +124,43 @@ class LossySumDoubleAggregator {
         }
     }
 
+    public static GroupingAggregatorFunction.AddInput wrapAddInput(
+        GroupingAggregatorFunction.AddInput delegate,
+        GroupingSumState state,
+        DoubleVector values
+    ) {
+        return new GroupingAggregatorFunction.AddInput() {
+            @Override
+            public void add(int positionOffset, IntArrayBlock groupIds) {
+                delegate.add(positionOffset, groupIds);
+            }
+
+            @Override
+            public void add(int positionOffset, IntBigArrayBlock groupIds) {
+                delegate.add(positionOffset, groupIds);
+            }
+
+            @Override
+            public void add(int positionOffset, IntVector groupIds) {
+                if (groupIds.isConstant()) {
+                    double sum = 0.0;
+                    int positionCount = groupIds.getPositionCount();
+                    for (int i = 0; i < positionCount; i++) {
+                        sum += values.getDouble(i);
+                    }
+                    state.add(sum, groupIds.getInt(0));
+                } else {
+                    delegate.add(positionOffset, groupIds);
+                }
+            }
+
+            @Override
+            public void close() {
+                Releasables.close(delegate);
+            }
+        };
+    }
+
     static final class SumState implements AggregatorState {
         private boolean seen;
         double value;
@@ -149,7 +189,7 @@ class LossySumDoubleAggregator {
             super(bigArrays);
             boolean success = false;
             try {
-                this.values = bigArrays.newDoubleArray(1);
+                this.values = bigArrays.newDoubleArray(128);
                 success = true;
             } finally {
                 if (success == false) {