1
0
Эх сурвалжийг харах

ES|QL: Make TopN aggregator use heap sort internally. (#134140)

Przemysław Witek 1 сар өмнө
parent
commit
20d6049c7f

+ 38 - 25
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java

@@ -20,7 +20,6 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortOrder;
 
-import java.util.Arrays;
 import java.util.stream.IntStream;
 
 /**
@@ -95,7 +94,7 @@ public class DoubleBucketedSort implements Releasable {
         if (inHeapMode(bucket)) {
             if (betterThan(value, values.get(rootIndex))) {
                 values.set(rootIndex, value);
-                downHeap(rootIndex, 0);
+                downHeap(rootIndex, 0, bucketSize);
             }
             return;
         }
@@ -111,7 +110,7 @@ public class DoubleBucketedSort implements Releasable {
         values.set(index, value);
         if (next == 0) {
             heapMode.set(bucket);
-            heapify(rootIndex);
+            heapify(rootIndex, bucketSize);
         } else {
             setNextGatherOffset(rootIndex, next - 1);
         }
@@ -172,14 +171,12 @@ public class DoubleBucketedSort implements Releasable {
             return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
 
-        // Used to sort the values in the bucket.
-        var bucketValues = new double[bucketSize];
-
         try (var builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) {
             for (int s = 0; s < selected.getPositionCount(); s++) {
                 int bucket = selected.getInt(s);
 
                 var bounds = getBucketValuesIndexes(bucket);
+                var rootIndex = bounds.v1();
                 var size = bounds.v2() - bounds.v1();
 
                 if (size == 0) {
@@ -192,22 +189,15 @@ public class DoubleBucketedSort implements Releasable {
                     continue;
                 }
 
-                for (int i = 0; i < size; i++) {
-                    bucketValues[i] = values.get(bounds.v1() + i);
+                // If we are in the gathering mode, we need to heapify before sorting.
+                if (inHeapMode(bucket) == false) {
+                    heapify(rootIndex, (int) size);
                 }
-
-                // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting
-                Arrays.sort(bucketValues, 0, (int) size);
+                heapSort(rootIndex, (int) size);
 
                 builder.beginPositionEntry();
-                if (order == SortOrder.ASC) {
-                    for (int i = 0; i < size; i++) {
-                        builder.appendDouble(bucketValues[i]);
-                    }
-                } else {
-                    for (int i = (int) size - 1; i >= 0; i--) {
-                        builder.appendDouble(bucketValues[i]);
-                    }
+                for (int i = 0; i < size; i++) {
+                    builder.appendDouble(values.get(bounds.v1() + i));
                 }
                 builder.endPositionEntry();
             }
@@ -305,10 +295,28 @@ public class DoubleBucketedSort implements Releasable {
      * </ul>
      * @param rootIndex the index the start of the bucket
      */
-    private void heapify(long rootIndex) {
-        int maxParent = bucketSize / 2 - 1;
+    private void heapify(long rootIndex, int heapSize) {
+        int maxParent = heapSize / 2 - 1;
         for (int parent = maxParent; parent >= 0; parent--) {
-            downHeap(rootIndex, parent);
+            downHeap(rootIndex, parent, heapSize);
+        }
+    }
+
+    /**
+     * Sorts all the values in the heap using heap sort algorithm.
+     * This runs in {@code O(n log n)} time.
+     * @param rootIndex index of the start of the bucket
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
+     */
+    private void heapSort(long rootIndex, int heapSize) {
+        while (heapSize > 0) {
+            swap(rootIndex, rootIndex + heapSize - 1);
+            heapSize--;
+            downHeap(rootIndex, 0, heapSize);
         }
     }
 
@@ -318,22 +326,27 @@ public class DoubleBucketedSort implements Releasable {
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
      *               For example, 0 is the "root".
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
      */
-    private void downHeap(long rootIndex, int parent) {
+    private void downHeap(long rootIndex, int parent, int heapSize) {
         while (true) {
             long parentIndex = rootIndex + parent;
             int worst = parent;
             long worstIndex = parentIndex;
             int leftChild = parent * 2 + 1;
             long leftIndex = rootIndex + leftChild;
-            if (leftChild < bucketSize) {
+            if (leftChild < heapSize) {
                 if (betterThan(values.get(worstIndex), values.get(leftIndex))) {
                     worst = leftChild;
                     worstIndex = leftIndex;
                 }
                 int rightChild = leftChild + 1;
                 long rightIndex = rootIndex + rightChild;
-                if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
+                if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
                     worst = rightChild;
                     worstIndex = rightIndex;
                 }

+ 38 - 25
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java

@@ -20,7 +20,6 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortOrder;
 
-import java.util.Arrays;
 import java.util.stream.IntStream;
 
 /**
@@ -95,7 +94,7 @@ public class FloatBucketedSort implements Releasable {
         if (inHeapMode(bucket)) {
             if (betterThan(value, values.get(rootIndex))) {
                 values.set(rootIndex, value);
-                downHeap(rootIndex, 0);
+                downHeap(rootIndex, 0, bucketSize);
             }
             return;
         }
@@ -111,7 +110,7 @@ public class FloatBucketedSort implements Releasable {
         values.set(index, value);
         if (next == 0) {
             heapMode.set(bucket);
-            heapify(rootIndex);
+            heapify(rootIndex, bucketSize);
         } else {
             setNextGatherOffset(rootIndex, next - 1);
         }
@@ -172,14 +171,12 @@ public class FloatBucketedSort implements Releasable {
             return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
 
-        // Used to sort the values in the bucket.
-        var bucketValues = new float[bucketSize];
-
         try (var builder = blockFactory.newFloatBlockBuilder(selected.getPositionCount())) {
             for (int s = 0; s < selected.getPositionCount(); s++) {
                 int bucket = selected.getInt(s);
 
                 var bounds = getBucketValuesIndexes(bucket);
+                var rootIndex = bounds.v1();
                 var size = bounds.v2() - bounds.v1();
 
                 if (size == 0) {
@@ -192,22 +189,15 @@ public class FloatBucketedSort implements Releasable {
                     continue;
                 }
 
-                for (int i = 0; i < size; i++) {
-                    bucketValues[i] = values.get(bounds.v1() + i);
+                // If we are in the gathering mode, we need to heapify before sorting.
+                if (inHeapMode(bucket) == false) {
+                    heapify(rootIndex, (int) size);
                 }
-
-                // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting
-                Arrays.sort(bucketValues, 0, (int) size);
+                heapSort(rootIndex, (int) size);
 
                 builder.beginPositionEntry();
-                if (order == SortOrder.ASC) {
-                    for (int i = 0; i < size; i++) {
-                        builder.appendFloat(bucketValues[i]);
-                    }
-                } else {
-                    for (int i = (int) size - 1; i >= 0; i--) {
-                        builder.appendFloat(bucketValues[i]);
-                    }
+                for (int i = 0; i < size; i++) {
+                    builder.appendFloat(values.get(bounds.v1() + i));
                 }
                 builder.endPositionEntry();
             }
@@ -305,10 +295,28 @@ public class FloatBucketedSort implements Releasable {
      * </ul>
      * @param rootIndex the index the start of the bucket
      */
-    private void heapify(long rootIndex) {
-        int maxParent = bucketSize / 2 - 1;
+    private void heapify(long rootIndex, int heapSize) {
+        int maxParent = heapSize / 2 - 1;
         for (int parent = maxParent; parent >= 0; parent--) {
-            downHeap(rootIndex, parent);
+            downHeap(rootIndex, parent, heapSize);
+        }
+    }
+
+    /**
+     * Sorts all the values in the heap using heap sort algorithm.
+     * This runs in {@code O(n log n)} time.
+     * @param rootIndex index of the start of the bucket
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
+     */
+    private void heapSort(long rootIndex, int heapSize) {
+        while (heapSize > 0) {
+            swap(rootIndex, rootIndex + heapSize - 1);
+            heapSize--;
+            downHeap(rootIndex, 0, heapSize);
         }
     }
 
@@ -318,22 +326,27 @@ public class FloatBucketedSort implements Releasable {
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
      *               For example, 0 is the "root".
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
      */
-    private void downHeap(long rootIndex, int parent) {
+    private void downHeap(long rootIndex, int parent, int heapSize) {
         while (true) {
             long parentIndex = rootIndex + parent;
             int worst = parent;
             long worstIndex = parentIndex;
             int leftChild = parent * 2 + 1;
             long leftIndex = rootIndex + leftChild;
-            if (leftChild < bucketSize) {
+            if (leftChild < heapSize) {
                 if (betterThan(values.get(worstIndex), values.get(leftIndex))) {
                     worst = leftChild;
                     worstIndex = leftIndex;
                 }
                 int rightChild = leftChild + 1;
                 long rightIndex = rootIndex + rightChild;
-                if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
+                if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
                     worst = rightChild;
                     worstIndex = rightIndex;
                 }

+ 38 - 25
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java

@@ -20,7 +20,6 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortOrder;
 
-import java.util.Arrays;
 import java.util.stream.IntStream;
 
 /**
@@ -95,7 +94,7 @@ public class IntBucketedSort implements Releasable {
         if (inHeapMode(bucket)) {
             if (betterThan(value, values.get(rootIndex))) {
                 values.set(rootIndex, value);
-                downHeap(rootIndex, 0);
+                downHeap(rootIndex, 0, bucketSize);
             }
             return;
         }
@@ -111,7 +110,7 @@ public class IntBucketedSort implements Releasable {
         values.set(index, value);
         if (next == 0) {
             heapMode.set(bucket);
-            heapify(rootIndex);
+            heapify(rootIndex, bucketSize);
         } else {
             setNextGatherOffset(rootIndex, next - 1);
         }
@@ -172,14 +171,12 @@ public class IntBucketedSort implements Releasable {
             return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
 
-        // Used to sort the values in the bucket.
-        var bucketValues = new int[bucketSize];
-
         try (var builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) {
             for (int s = 0; s < selected.getPositionCount(); s++) {
                 int bucket = selected.getInt(s);
 
                 var bounds = getBucketValuesIndexes(bucket);
+                var rootIndex = bounds.v1();
                 var size = bounds.v2() - bounds.v1();
 
                 if (size == 0) {
@@ -192,22 +189,15 @@ public class IntBucketedSort implements Releasable {
                     continue;
                 }
 
-                for (int i = 0; i < size; i++) {
-                    bucketValues[i] = values.get(bounds.v1() + i);
+                // If we are in the gathering mode, we need to heapify before sorting.
+                if (inHeapMode(bucket) == false) {
+                    heapify(rootIndex, (int) size);
                 }
-
-                // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting
-                Arrays.sort(bucketValues, 0, (int) size);
+                heapSort(rootIndex, (int) size);
 
                 builder.beginPositionEntry();
-                if (order == SortOrder.ASC) {
-                    for (int i = 0; i < size; i++) {
-                        builder.appendInt(bucketValues[i]);
-                    }
-                } else {
-                    for (int i = (int) size - 1; i >= 0; i--) {
-                        builder.appendInt(bucketValues[i]);
-                    }
+                for (int i = 0; i < size; i++) {
+                    builder.appendInt(values.get(bounds.v1() + i));
                 }
                 builder.endPositionEntry();
             }
@@ -305,10 +295,28 @@ public class IntBucketedSort implements Releasable {
      * </ul>
      * @param rootIndex the index the start of the bucket
      */
-    private void heapify(long rootIndex) {
-        int maxParent = bucketSize / 2 - 1;
+    private void heapify(long rootIndex, int heapSize) {
+        int maxParent = heapSize / 2 - 1;
         for (int parent = maxParent; parent >= 0; parent--) {
-            downHeap(rootIndex, parent);
+            downHeap(rootIndex, parent, heapSize);
+        }
+    }
+
+    /**
+     * Sorts all the values in the heap using heap sort algorithm.
+     * This runs in {@code O(n log n)} time.
+     * @param rootIndex index of the start of the bucket
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
+     */
+    private void heapSort(long rootIndex, int heapSize) {
+        while (heapSize > 0) {
+            swap(rootIndex, rootIndex + heapSize - 1);
+            heapSize--;
+            downHeap(rootIndex, 0, heapSize);
         }
     }
 
@@ -318,22 +326,27 @@ public class IntBucketedSort implements Releasable {
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
      *               For example, 0 is the "root".
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
      */
-    private void downHeap(long rootIndex, int parent) {
+    private void downHeap(long rootIndex, int parent, int heapSize) {
         while (true) {
             long parentIndex = rootIndex + parent;
             int worst = parent;
             long worstIndex = parentIndex;
             int leftChild = parent * 2 + 1;
             long leftIndex = rootIndex + leftChild;
-            if (leftChild < bucketSize) {
+            if (leftChild < heapSize) {
                 if (betterThan(values.get(worstIndex), values.get(leftIndex))) {
                     worst = leftChild;
                     worstIndex = leftIndex;
                 }
                 int rightChild = leftChild + 1;
                 long rightIndex = rootIndex + rightChild;
-                if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
+                if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
                     worst = rightChild;
                     worstIndex = rightIndex;
                 }

+ 38 - 25
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java

@@ -20,7 +20,6 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortOrder;
 
-import java.util.Arrays;
 import java.util.stream.IntStream;
 
 /**
@@ -95,7 +94,7 @@ public class LongBucketedSort implements Releasable {
         if (inHeapMode(bucket)) {
             if (betterThan(value, values.get(rootIndex))) {
                 values.set(rootIndex, value);
-                downHeap(rootIndex, 0);
+                downHeap(rootIndex, 0, bucketSize);
             }
             return;
         }
@@ -111,7 +110,7 @@ public class LongBucketedSort implements Releasable {
         values.set(index, value);
         if (next == 0) {
             heapMode.set(bucket);
-            heapify(rootIndex);
+            heapify(rootIndex, bucketSize);
         } else {
             setNextGatherOffset(rootIndex, next - 1);
         }
@@ -172,14 +171,12 @@ public class LongBucketedSort implements Releasable {
             return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
 
-        // Used to sort the values in the bucket.
-        var bucketValues = new long[bucketSize];
-
         try (var builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) {
             for (int s = 0; s < selected.getPositionCount(); s++) {
                 int bucket = selected.getInt(s);
 
                 var bounds = getBucketValuesIndexes(bucket);
+                var rootIndex = bounds.v1();
                 var size = bounds.v2() - bounds.v1();
 
                 if (size == 0) {
@@ -192,22 +189,15 @@ public class LongBucketedSort implements Releasable {
                     continue;
                 }
 
-                for (int i = 0; i < size; i++) {
-                    bucketValues[i] = values.get(bounds.v1() + i);
+                // If we are in the gathering mode, we need to heapify before sorting.
+                if (inHeapMode(bucket) == false) {
+                    heapify(rootIndex, (int) size);
                 }
-
-                // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting
-                Arrays.sort(bucketValues, 0, (int) size);
+                heapSort(rootIndex, (int) size);
 
                 builder.beginPositionEntry();
-                if (order == SortOrder.ASC) {
-                    for (int i = 0; i < size; i++) {
-                        builder.appendLong(bucketValues[i]);
-                    }
-                } else {
-                    for (int i = (int) size - 1; i >= 0; i--) {
-                        builder.appendLong(bucketValues[i]);
-                    }
+                for (int i = 0; i < size; i++) {
+                    builder.appendLong(values.get(bounds.v1() + i));
                 }
                 builder.endPositionEntry();
             }
@@ -305,10 +295,28 @@ public class LongBucketedSort implements Releasable {
      * </ul>
      * @param rootIndex the index the start of the bucket
      */
-    private void heapify(long rootIndex) {
-        int maxParent = bucketSize / 2 - 1;
+    private void heapify(long rootIndex, int heapSize) {
+        int maxParent = heapSize / 2 - 1;
         for (int parent = maxParent; parent >= 0; parent--) {
-            downHeap(rootIndex, parent);
+            downHeap(rootIndex, parent, heapSize);
+        }
+    }
+
+    /**
+     * Sorts all the values in the heap using heap sort algorithm.
+     * This runs in {@code O(n log n)} time.
+     * @param rootIndex index of the start of the bucket
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
+     */
+    private void heapSort(long rootIndex, int heapSize) {
+        while (heapSize > 0) {
+            swap(rootIndex, rootIndex + heapSize - 1);
+            heapSize--;
+            downHeap(rootIndex, 0, heapSize);
         }
     }
 
@@ -318,22 +326,27 @@ public class LongBucketedSort implements Releasable {
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
      *               For example, 0 is the "root".
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
      */
-    private void downHeap(long rootIndex, int parent) {
+    private void downHeap(long rootIndex, int parent, int heapSize) {
         while (true) {
             long parentIndex = rootIndex + parent;
             int worst = parent;
             long worstIndex = parentIndex;
             int leftChild = parent * 2 + 1;
             long leftIndex = rootIndex + leftChild;
-            if (leftChild < bucketSize) {
+            if (leftChild < heapSize) {
                 if (betterThan(values.get(worstIndex), values.get(leftIndex))) {
                     worst = leftChild;
                     worstIndex = leftIndex;
                 }
                 int rightChild = leftChild + 1;
                 long rightIndex = rootIndex + rightChild;
-                if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
+                if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
                     worst = rightChild;
                     worstIndex = rightIndex;
                 }

+ 0 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BooleanBucketedSort.java

@@ -53,9 +53,6 @@ public class BooleanBucketedSort implements Releasable {
 
     /**
      * Collects a {@code value} into a {@code bucket}.
-     * <p>
-     *     It may or may not be inserted in the heap, depending on if it is better than the current root.
-     * </p>
      */
     public void collect(boolean value, int bucket) {
         long rootIndex = (long) bucket * 2;

+ 39 - 29
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java

@@ -24,7 +24,6 @@ import org.elasticsearch.core.Releasables;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortOrder;
 
-import java.util.Arrays;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
@@ -122,7 +121,7 @@ public class BytesRefBucketedSort implements Releasable {
         if (common.inHeapMode(bucket)) {
             if (betterThan(value, values.get(rootIndex).bytesRefView())) {
                 clearedBytesAt(rootIndex).append(value);
-                downHeap(rootIndex, 0);
+                downHeap(rootIndex, 0, common.bucketSize);
             }
             checkInvariant(bucket);
             return;
@@ -138,7 +137,7 @@ public class BytesRefBucketedSort implements Releasable {
         clearedBytesAt(index).append(value);
         if (next == 0) {
             common.enableHeapMode(bucket);
-            heapify(rootIndex);
+            heapify(rootIndex, common.bucketSize);
         } else {
             ByteUtils.writeIntLE(next - 1, values.get(rootIndex).bytes(), 0);
         }
@@ -182,9 +181,6 @@ public class BytesRefBucketedSort implements Releasable {
             return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
 
-        // Used to sort the values in the bucket.
-        BytesRef[] bucketValues = new BytesRef[common.bucketSize];
-
         try (var builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) {
             for (int s = 0; s < selected.getPositionCount(); s++) {
                 int bucket = selected.getInt(s);
@@ -212,25 +208,18 @@ public class BytesRefBucketedSort implements Releasable {
                     continue;
                 }
 
-                for (int i = 0; i < size; i++) {
-                    try (BreakingBytesRefBuilder bytes = values.get(start + i)) {
-                        bucketValues[i] = bytes.bytesRefView();
-                    }
-                    values.set(start + i, null);
+                // If we are in the gathering mode, we need to heapify before sorting.
+                if (common.inHeapMode(bucket) == false) {
+                    heapify(start, (int) size);
                 }
-
-                // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting
-                Arrays.sort(bucketValues, 0, (int) size);
+                heapSort(start, (int) size);
 
                 builder.beginPositionEntry();
-                if (common.order == SortOrder.ASC) {
-                    for (int i = 0; i < size; i++) {
-                        builder.appendBytesRef(bucketValues[i]);
-                    }
-                } else {
-                    for (int i = (int) size - 1; i >= 0; i--) {
-                        builder.appendBytesRef(bucketValues[i]);
+                for (int i = 0; i < size; i++) {
+                    try (BreakingBytesRefBuilder bytes = values.get(start + i)) {
+                        builder.appendBytesRef(bytes.bytesRefView());
                     }
+                    values.set(start + i, null);
                 }
                 builder.endPositionEntry();
             }
@@ -339,10 +328,28 @@ public class BytesRefBucketedSort implements Releasable {
      * </ul>
      * @param rootIndex the index the start of the bucket
      */
-    private void heapify(long rootIndex) {
-        int maxParent = common.bucketSize / 2 - 1;
+    private void heapify(long rootIndex, int heapSize) {
+        int maxParent = heapSize / 2 - 1;
         for (int parent = maxParent; parent >= 0; parent--) {
-            downHeap(rootIndex, parent);
+            downHeap(rootIndex, parent, heapSize);
+        }
+    }
+
+    /**
+     * Sorts all the values in the heap using heap sort algorithm.
+     * This runs in {@code O(n log n)} time.
+     * @param rootIndex index of the start of the bucket
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
+     */
+    private void heapSort(long rootIndex, int heapSize) {
+        while (heapSize > 0) {
+            swap(rootIndex, rootIndex + heapSize - 1);
+            heapSize--;
+            downHeap(rootIndex, 0, heapSize);
         }
     }
 
@@ -352,24 +359,27 @@ public class BytesRefBucketedSort implements Releasable {
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
      *               For example, 0 is the "root".
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
      */
-    private void downHeap(long rootIndex, int parent) {
+    private void downHeap(long rootIndex, int parent, int heapSize) {
         while (true) {
             long parentIndex = rootIndex + parent;
             int worst = parent;
             long worstIndex = parentIndex;
             int leftChild = parent * 2 + 1;
             long leftIndex = rootIndex + leftChild;
-            if (leftChild < common.bucketSize) {
+            if (leftChild < heapSize) {
                 if (betterThan(values.get(worstIndex).bytesRefView(), values.get(leftIndex).bytesRefView())) {
                     worst = leftChild;
                     worstIndex = leftIndex;
                 }
                 int rightChild = leftChild + 1;
                 long rightIndex = rootIndex + rightChild;
-                if (rightChild < common.bucketSize
-                    && betterThan(values.get(worstIndex).bytesRefView(), values.get(rightIndex).bytesRefView())) {
-
+                if (rightChild < heapSize && betterThan(values.get(worstIndex).bytesRefView(), values.get(rightIndex).bytesRefView())) {
                     worst = rightChild;
                     worstIndex = rightIndex;
                 }

+ 39 - 26
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java

@@ -21,7 +21,6 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortOrder;
 
-import java.util.Arrays;
 import java.util.stream.IntStream;
 
 /**
@@ -100,7 +99,7 @@ public class IpBucketedSort implements Releasable {
         if (common.inHeapMode(bucket)) {
             if (betterThan(value, get(rootIndex, scratch1))) {
                 set(rootIndex, value);
-                downHeap(rootIndex, 0);
+                downHeap(rootIndex, 0, common.bucketSize);
             }
             return;
         }
@@ -115,7 +114,7 @@ public class IpBucketedSort implements Releasable {
         set(index, value);
         if (next == 0) {
             common.enableHeapMode(bucket);
-            heapify(rootIndex);
+            heapify(rootIndex, common.bucketSize);
         } else {
             setNextGatherOffset(rootIndex, next - 1);
         }
@@ -163,14 +162,12 @@ public class IpBucketedSort implements Releasable {
             return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
 
-        // Used to sort the values in the bucket.
-        var bucketValues = new BytesRef[common.bucketSize];
-
         try (var builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) {
             for (int s = 0; s < selected.getPositionCount(); s++) {
                 int bucket = selected.getInt(s);
 
                 var bounds = getBucketValuesIndexes(bucket);
+                var rootIndex = bounds.v1();
                 var size = bounds.v2() - bounds.v1();
 
                 if (size == 0) {
@@ -179,26 +176,19 @@ public class IpBucketedSort implements Releasable {
                 }
 
                 if (size == 1) {
-                    builder.appendBytesRef(get(bounds.v1(), scratch1));
+                    builder.appendBytesRef(get(rootIndex, scratch1));
                     continue;
                 }
 
-                for (int i = 0; i < size; i++) {
-                    bucketValues[i] = get(bounds.v1() + i, new BytesRef());
+                // If we are in the gathering mode, we need to heapify before sorting.
+                if (common.inHeapMode(bucket) == false) {
+                    heapify(rootIndex, (int) size);
                 }
-
-                // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting
-                Arrays.sort(bucketValues, 0, (int) size);
+                heapSort(rootIndex, (int) size);
 
                 builder.beginPositionEntry();
-                if (common.order == SortOrder.ASC) {
-                    for (int i = 0; i < size; i++) {
-                        builder.appendBytesRef(bucketValues[i]);
-                    }
-                } else {
-                    for (int i = (int) size - 1; i >= 0; i--) {
-                        builder.appendBytesRef(bucketValues[i]);
-                    }
+                for (int i = 0; i < size; i++) {
+                    builder.appendBytesRef(get(rootIndex + i, new BytesRef()));
                 }
                 builder.endPositionEntry();
             }
@@ -319,10 +309,28 @@ public class IpBucketedSort implements Releasable {
      * </ul>
      * @param rootIndex the index the start of the bucket
      */
-    private void heapify(long rootIndex) {
-        int maxParent = common.bucketSize / 2 - 1;
+    private void heapify(long rootIndex, int heapSize) {
+        int maxParent = heapSize / 2 - 1;
         for (int parent = maxParent; parent >= 0; parent--) {
-            downHeap(rootIndex, parent);
+            downHeap(rootIndex, parent, heapSize);
+        }
+    }
+
+    /**
+     * Sorts all the values in the heap using heap sort algorithm.
+     * This runs in {@code O(n log n)} time.
+     * @param rootIndex index of the start of the bucket
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
+     */
+    private void heapSort(long rootIndex, int heapSize) {
+        while (heapSize > 0) {
+            swap(rootIndex, rootIndex + heapSize - 1);
+            heapSize--;
+            downHeap(rootIndex, 0, heapSize);
         }
     }
 
@@ -332,22 +340,27 @@ public class IpBucketedSort implements Releasable {
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
      *               For example, 0 is the "root".
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
      */
-    private void downHeap(long rootIndex, int parent) {
+    private void downHeap(long rootIndex, int parent, int heapSize) {
         while (true) {
             long parentIndex = rootIndex + parent;
             int worst = parent;
             long worstIndex = parentIndex;
             int leftChild = parent * 2 + 1;
             long leftIndex = rootIndex + leftChild;
-            if (leftChild < common.bucketSize) {
+            if (leftChild < heapSize) {
                 if (betterThan(get(worstIndex, scratch1), get(leftIndex, scratch2))) {
                     worst = leftChild;
                     worstIndex = leftIndex;
                 }
                 int rightChild = leftChild + 1;
                 long rightIndex = rootIndex + rightChild;
-                if (rightChild < common.bucketSize && betterThan(get(worstIndex, scratch1), get(rightIndex, scratch2))) {
+                if (rightChild < heapSize && betterThan(get(worstIndex, scratch1), get(rightIndex, scratch2))) {
                     worst = rightChild;
                     worstIndex = rightIndex;
                 }

+ 38 - 25
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st

@@ -20,7 +20,6 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.search.sort.BucketedSort;
 import org.elasticsearch.search.sort.SortOrder;
 
-import java.util.Arrays;
 import java.util.stream.IntStream;
 
 /**
@@ -95,7 +94,7 @@ public class $Type$BucketedSort implements Releasable {
         if (inHeapMode(bucket)) {
             if (betterThan(value, values.get(rootIndex))) {
                 values.set(rootIndex, value);
-                downHeap(rootIndex, 0);
+                downHeap(rootIndex, 0, bucketSize);
             }
             return;
         }
@@ -111,7 +110,7 @@ public class $Type$BucketedSort implements Releasable {
         values.set(index, value);
         if (next == 0) {
             heapMode.set(bucket);
-            heapify(rootIndex);
+            heapify(rootIndex, bucketSize);
         } else {
             setNextGatherOffset(rootIndex, next - 1);
         }
@@ -172,14 +171,12 @@ public class $Type$BucketedSort implements Releasable {
             return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
 
-        // Used to sort the values in the bucket.
-        var bucketValues = new $type$[bucketSize];
-
         try (var builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) {
             for (int s = 0; s < selected.getPositionCount(); s++) {
                 int bucket = selected.getInt(s);
 
                 var bounds = getBucketValuesIndexes(bucket);
+                var rootIndex = bounds.v1();
                 var size = bounds.v2() - bounds.v1();
 
                 if (size == 0) {
@@ -192,22 +189,15 @@ public class $Type$BucketedSort implements Releasable {
                     continue;
                 }
 
-                for (int i = 0; i < size; i++) {
-                    bucketValues[i] = values.get(bounds.v1() + i);
+                // If we are in the gathering mode, we need to heapify before sorting.
+                if (inHeapMode(bucket) == false) {
+                    heapify(rootIndex, (int) size);
                 }
-
-                // TODO: Make use of heap structures to faster iterate in order instead of copying and sorting
-                Arrays.sort(bucketValues, 0, (int) size);
+                heapSort(rootIndex, (int) size);
 
                 builder.beginPositionEntry();
-                if (order == SortOrder.ASC) {
-                    for (int i = 0; i < size; i++) {
-                        builder.append$Type$(bucketValues[i]);
-                    }
-                } else {
-                    for (int i = (int) size - 1; i >= 0; i--) {
-                        builder.append$Type$(bucketValues[i]);
-                    }
+                for (int i = 0; i < size; i++) {
+                    builder.append$Type$(values.get(bounds.v1() + i));
                 }
                 builder.endPositionEntry();
             }
@@ -309,10 +299,28 @@ $endif$
      * </ul>
      * @param rootIndex the index the start of the bucket
      */
-    private void heapify(long rootIndex) {
-        int maxParent = bucketSize / 2 - 1;
+    private void heapify(long rootIndex, int heapSize) {
+        int maxParent = heapSize / 2 - 1;
         for (int parent = maxParent; parent >= 0; parent--) {
-            downHeap(rootIndex, parent);
+            downHeap(rootIndex, parent, heapSize);
+        }
+    }
+
+    /**
+     * Sorts all the values in the heap using heap sort algorithm.
+     * This runs in {@code O(n log n)} time.
+     * @param rootIndex index of the start of the bucket
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
+     */
+    private void heapSort(long rootIndex, int heapSize) {
+        while (heapSize > 0) {
+            swap(rootIndex, rootIndex + heapSize - 1);
+            heapSize--;
+            downHeap(rootIndex, 0, heapSize);
         }
     }
 
@@ -322,22 +330,27 @@ $endif$
      * @param rootIndex index of the start of the bucket
      * @param parent Index within the bucket of the parent to check.
      *               For example, 0 is the "root".
+     * @param heapSize Number of values that belong to the heap.
+     *                 Can be less than bucketSize.
+     *                 In such a case, the remaining values in range
+     *                 (rootIndex + heapSize, rootIndex + bucketSize)
+     *                 are *not* considered part of the heap.
      */
-    private void downHeap(long rootIndex, int parent) {
+    private void downHeap(long rootIndex, int parent, int heapSize) {
         while (true) {
             long parentIndex = rootIndex + parent;
             int worst = parent;
             long worstIndex = parentIndex;
             int leftChild = parent * 2 + 1;
             long leftIndex = rootIndex + leftChild;
-            if (leftChild < bucketSize) {
+            if (leftChild < heapSize) {
                 if (betterThan(values.get(worstIndex), values.get(leftIndex))) {
                     worst = leftChild;
                     worstIndex = leftIndex;
                 }
                 int rightChild = leftChild + 1;
                 long rightIndex = rootIndex + rightChild;
-                if (rightChild < bucketSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
+                if (rightChild < heapSize && betterThan(values.get(worstIndex), values.get(rightIndex))) {
                     worst = rightChild;
                     worstIndex = rightIndex;
                 }