Selaa lähdekoodia

ESQL: Pre-allocate rows in TopNOperator (#104796)

Starting with empty rows and growing them causes lots of allocations and
thus bad performance in case of many large fields being contained in the
rows.
Instead, use the previously encountered row to estimate the size of the next row.
Alexander Spies 1 vuosi sitten
vanhempi
commit
1ef8beca60

+ 5 - 0
docs/changelog/104796.yaml

@@ -0,0 +1,5 @@
+pr: 104796
+summary: "ESQL: Pre-allocate rows in TopNOperator"
+area: ES|QL
+type: enhancement
+issues: []

+ 29 - 13
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/BreakingBytesRefBuilder.java

@@ -31,6 +31,16 @@ public class BreakingBytesRefBuilder implements Accountable, Releasable {
      * @param label the label reported by the breaker when it breaks
      */
     public BreakingBytesRefBuilder(CircuitBreaker breaker, String label) {
+        this(breaker, label, 0);
+    }
+
+    /**
+     * Build.
+     * @param breaker the {@link CircuitBreaker} to check on resize
+     * @param label the label reported by the breaker when it breaks
+     * @param initialCapacity the number of bytes initially allocated
+     */
+    public BreakingBytesRefBuilder(CircuitBreaker breaker, String label, int initialCapacity) {
         /*
          * We initialize BytesRef to a shared empty bytes array as is tradition.
          * It's a good tradition. We don't know how big the thing will ultimately
@@ -41,8 +51,12 @@ public class BreakingBytesRefBuilder implements Accountable, Releasable {
          * last long so it isn't worth making the accounting more complex to get it
          * perfect. And overcounting in general isn't too bad compared to undercounting.
          */
-        breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE + RamUsageEstimator.sizeOf(BytesRef.EMPTY_BYTES), label);
-        this.bytes = new BytesRef();
+        breaker.addEstimateBytesAndMaybeBreak(SHALLOW_SIZE + bytesArrayRamBytesUsed(initialCapacity), label);
+        if (initialCapacity == 0) {
+            this.bytes = new BytesRef();
+        } else {
+            this.bytes = new BytesRef(initialCapacity);
+        }
         this.breaker = breaker;
         this.label = label;
     }
@@ -52,17 +66,15 @@ public class BreakingBytesRefBuilder implements Accountable, Releasable {
      * {@code capacity}.
      */
     public void grow(int capacity) {
-        int oldLength = bytes.bytes.length;
-        if (oldLength > capacity) {
+        int oldCapacity = bytes.bytes.length;
+        if (oldCapacity > capacity) {
             return;
         }
-        int newLength = ArrayUtil.oversize(capacity, Byte.BYTES);
-        breaker.addEstimateBytesAndMaybeBreak(
-            RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + newLength),
-            label
-        );
-        bytes.bytes = ArrayUtil.growExact(bytes.bytes, newLength);
-        breaker.addWithoutBreaking(-RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + oldLength));
+        int oversizedCapacity = ArrayUtil.oversize(capacity, Byte.BYTES);
+        breaker.addEstimateBytesAndMaybeBreak(bytesArrayRamBytesUsed(oversizedCapacity), label);
+
+        bytes.bytes = ArrayUtil.growExact(bytes.bytes, oversizedCapacity);
+        breaker.addWithoutBreaking(-bytesArrayRamBytesUsed(oldCapacity));
     }
 
     /**
@@ -80,7 +92,7 @@ public class BreakingBytesRefBuilder implements Accountable, Releasable {
     }
 
     /**
-     * The number of bytes in to this buffer. It is <strong>not></strong>
+     * The number of bytes in this buffer. It is <strong>not></strong>
      * the capacity of the buffer.
      */
     public int length() {
@@ -139,7 +151,11 @@ public class BreakingBytesRefBuilder implements Accountable, Releasable {
 
     @Override
     public long ramBytesUsed() {
-        return SHALLOW_SIZE + RamUsageEstimator.sizeOf(bytes.bytes);
+        return SHALLOW_SIZE + bytesArrayRamBytesUsed(bytes.bytes.length);
+    }
+
+    private static long bytesArrayRamBytesUsed(long capacity) {
+        return RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + capacity);
     }
 
     @Override

+ 14 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java

@@ -71,11 +71,11 @@ public class TopNOperator implements Operator, Accountable {
          */
         final BreakingBytesRefBuilder values;
 
-        Row(CircuitBreaker breaker, List<SortOrder> sortOrders) {
+        Row(CircuitBreaker breaker, List<SortOrder> sortOrders, int preAllocatedKeysSize, int preAllocatedValueSize) {
             boolean success = false;
             try {
-                keys = new BreakingBytesRefBuilder(breaker, "topn");
-                values = new BreakingBytesRefBuilder(breaker, "topn");
+                keys = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedKeysSize);
+                values = new BreakingBytesRefBuilder(breaker, "topn", preAllocatedValueSize);
                 bytesOrder = new BytesOrder(sortOrders, breaker, "topn");
                 success = true;
             } finally {
@@ -273,6 +273,9 @@ public class TopNOperator implements Operator, Accountable {
     private final List<SortOrder> sortOrders;
 
     private Row spare;
+    private int spareValuesPreAllocSize = 0;
+    private int spareKeysPreAllocSize = 0;
+
     private Iterator<Page> output;
 
     public TopNOperator(
@@ -349,12 +352,19 @@ public class TopNOperator implements Operator, Accountable {
 
             for (int i = 0; i < page.getPositionCount(); i++) {
                 if (spare == null) {
-                    spare = new Row(breaker, sortOrders);
+                    spare = new Row(breaker, sortOrders, spareKeysPreAllocSize, spareValuesPreAllocSize);
                 } else {
                     spare.keys.clear();
                     spare.values.clear();
                 }
                 rowFiller.row(i, spare);
+
+                // When rows are very long, appending the values one by one can lead to lots of allocations.
+                // To avoid this, pre-allocate at least as much size as in the last seen row.
+                // Let the pre-allocation size decay in case we only have 1 huge row and smaller rows otherwise.
+                spareKeysPreAllocSize = Math.max(spare.keys.length(), spareKeysPreAllocSize / 2);
+                spareValuesPreAllocSize = Math.max(spare.values.length(), spareValuesPreAllocSize / 2);
+
                 spare = inputQueue.insertWithOverflow(spare);
             }
         } finally {

+ 89 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/CountingCircuitBreaker.java

@@ -0,0 +1,89 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.breaker.CircuitBreakingException;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A CircuitBreaker that counts how often memory was requested.
+ * Delegates to another circuit breaker
+ */
+public class CountingCircuitBreaker implements CircuitBreaker {
+    CircuitBreaker delegate;
+    AtomicLong memoryRequestCount;
+
+    public CountingCircuitBreaker(CircuitBreaker delegate) {
+        this.delegate = delegate;
+        memoryRequestCount = new AtomicLong(0);
+    }
+
+    public long getMemoryRequestCount() {
+        return memoryRequestCount.get();
+    }
+
+    @Override
+    public void circuitBreak(String fieldName, long bytesNeeded) {
+        delegate.circuitBreak(fieldName, bytesNeeded);
+    }
+
+    @Override
+    public void addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException {
+        incMemoryRequestCountIfPositive(bytes);
+        delegate.addEstimateBytesAndMaybeBreak(bytes, label);
+    }
+
+    @Override
+    public void addWithoutBreaking(long bytes) {
+        incMemoryRequestCountIfPositive(bytes);
+        delegate.addWithoutBreaking(bytes);
+    }
+
+    @Override
+    public long getUsed() {
+        return delegate.getUsed();
+    }
+
+    @Override
+    public long getLimit() {
+        return delegate.getLimit();
+    }
+
+    @Override
+    public double getOverhead() {
+        return delegate.getOverhead();
+    }
+
+    @Override
+    public long getTrippedCount() {
+        return delegate.getTrippedCount();
+    }
+
+    @Override
+    public String getName() {
+        return delegate.getName();
+    }
+
+    @Override
+    public Durability getDurability() {
+        return delegate.getDurability();
+    }
+
+    @Override
+    public void setLimitAndOverhead(long limit, double overhead) {
+        delegate.setLimitAndOverhead(limit, overhead);
+    }
+
+    private void incMemoryRequestCountIfPositive(long bytes) {
+        if (bytes > 0) {
+            memoryRequestCount.incrementAndGet();
+        }
+    }
+}

+ 36 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.TestBlockBuilder;
 import org.elasticsearch.compute.data.TestBlockFactory;
 import org.elasticsearch.compute.operator.CannedSourceOperator;
+import org.elasticsearch.compute.operator.CountingCircuitBreaker;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
@@ -77,6 +78,7 @@ import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
@@ -431,7 +433,7 @@ public class TopNOperatorTests extends OperatorTestCase {
             sortOrders,
             page
         );
-        TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders);
+        TopNOperator.Row row = new TopNOperator.Row(nonBreakingBigArrays().breakerService().getBreaker("request"), sortOrders, 0, 0);
         rf.row(position, row);
         return row;
     }
@@ -1396,6 +1398,39 @@ public class TopNOperatorTests extends OperatorTestCase {
         }
     }
 
+    public void testRowResizes() {
+        int columns = 1000;
+        int rows = 1000;
+        CountingCircuitBreaker breaker = new CountingCircuitBreaker(
+            new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, ByteSizeValue.ofGb(1))
+        );
+        List<ElementType> types = Collections.nCopies(columns, INT);
+        List<TopNEncoder> encoders = Collections.nCopies(columns, DEFAULT_UNSORTABLE);
+        try (
+            TopNOperator op = new TopNOperator(
+                driverContext().blockFactory(),
+                breaker,
+                10,
+                types,
+                encoders,
+                List.of(new TopNOperator.SortOrder(0, randomBoolean(), randomBoolean())),
+                randomPageSize()
+            )
+        ) {
+            int[] blockValues = IntStream.range(0, rows).toArray();
+            Block block = blockFactory().newIntArrayVector(blockValues, rows).asBlock();
+            Block[] blocks = new Block[1000];
+            for (int i = 0; i < 1000; i++) {
+                blocks[i] = block;
+                block.incRef();
+            }
+            block.decRef();
+            op.addInput(new Page(blocks));
+
+            assertThat(breaker.getMemoryRequestCount(), is(94L));
+        }
+    }
+
     @SuppressWarnings({ "unchecked", "rawtypes" })
     private static void readAsRows(List<List<List<Object>>> values, Page page) {
         if (page.getBlockCount() == 0) {

+ 8 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNRowTests.java

@@ -20,19 +20,19 @@ public class TopNRowTests extends ESTestCase {
     private final CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST);
 
     public void testRamBytesUsedEmpty() {
-        TopNOperator.Row row = new TopNOperator.Row(breaker, sortOrders());
+        TopNOperator.Row row = new TopNOperator.Row(breaker, sortOrders(), 0, 0);
         assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row)));
     }
 
     public void testRamBytesUsedSmall() {
-        TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders());
+        TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders(), 0, 0);
         row.keys.append(randomByte());
         row.values.append(randomByte());
         assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row)));
     }
 
     public void testRamBytesUsedBig() {
-        TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders());
+        TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders(), 0, 0);
         for (int i = 0; i < 10000; i++) {
             row.keys.append(randomByte());
             row.values.append(randomByte());
@@ -40,6 +40,11 @@ public class TopNRowTests extends ESTestCase {
         assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row)));
     }
 
+    public void testRamBytesUsedPreAllocated() {
+        TopNOperator.Row row = new TopNOperator.Row(new NoopCircuitBreaker(CircuitBreaker.REQUEST), sortOrders(), 64, 128);
+        assertThat(row.ramBytesUsed(), equalTo(expectedRamBytesUsed(row)));
+    }
+
     private static List<TopNOperator.SortOrder> sortOrders() {
         return List.of(
             new TopNOperator.SortOrder(randomNonNegativeInt(), randomBoolean(), randomBoolean()),