Browse Source

Serialize big array vectors (#106327)

With this change, we now serialize the underlying pages of big array 
vectors instead of values. Note that we avoid slicing into the Netty
buffer because it's not tracked by the circuit breaker, which could be
an issue for ESQL.
Nhat Nguyen 1 year ago
parent
commit
946cf54ee5

+ 5 - 0
docs/changelog/106327.yaml

@@ -0,0 +1,5 @@
+pr: 106327
+summary: Serialize big array vectors
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -145,6 +145,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_EXTENDED_ENRICH_TYPES = def(8_605_00_0);
     public static final TransportVersion KNN_EXPLICIT_BYTE_QUERY_VECTOR_PARSING = def(8_606_00_0);
     public static final TransportVersion ESQL_EXTENDED_ENRICH_INPUT_TYPE = def(8_607_00_0);
+    public static final TransportVersion ESQL_SERIALIZE_BIG_VECTOR = def(8_608_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 23 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBigArrayVector.java

@@ -8,9 +8,13 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.BitArray;
 import org.elasticsearch.core.Releasable;
 
+import java.io.IOException;
+
 /**
  * Vector implementation that defers to an enclosed {@link BitArray}.
  * Does not take ownership of the array and does not adjust circuit breakers to account for it.
@@ -27,6 +31,25 @@ public final class BooleanBigArrayVector extends AbstractVector implements Boole
         this.values = values;
     }
 
+    static BooleanBigArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
+        BitArray values = new BitArray(blockFactory.bigArrays(), true, in);
+        boolean success = false;
+        try {
+            BooleanBigArrayVector vector = new BooleanBigArrayVector(values, positions, blockFactory);
+            blockFactory.adjustBreaker(vector.ramBytesUsed() - RamUsageEstimator.sizeOf(values));
+            success = true;
+            return vector;
+        } finally {
+            if (success == false) {
+                values.close();
+            }
+        }
+    }
+
+    void writeArrayVector(int positions, StreamOutput out) throws IOException {
+        values.writeTo(out);
+    }
+
     @Override
     public BooleanBlock asBlock() {
         return new BooleanVectorBlock(this);

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVector.java

@@ -80,6 +80,7 @@ public sealed interface BooleanVector extends Vector permits ConstantBooleanVect
             case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
             case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantBooleanVector(in.readBoolean(), positions);
             case SERIALIZE_VECTOR_ARRAY -> BooleanArrayVector.readArrayVector(positions, in, blockFactory);
+            case SERIALIZE_VECTOR_BIG_ARRAY -> BooleanBigArrayVector.readArrayVector(positions, in, blockFactory);
             default -> {
                 assert false : "invalid vector serialization type [" + serializationType + "]";
                 throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
@@ -98,6 +99,9 @@ public sealed interface BooleanVector extends Vector permits ConstantBooleanVect
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof BooleanArrayVector v) {
             out.writeByte(SERIALIZE_VECTOR_ARRAY);
             v.writeArrayVector(positions, out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_VECTOR) && this instanceof BooleanBigArrayVector v) {
+            out.writeByte(SERIALIZE_VECTOR_BIG_ARRAY);
+            v.writeArrayVector(positions, out);
         } else {
             out.writeByte(SERIALIZE_VECTOR_VALUES);
             writeValues(this, positions, out);

+ 24 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBigArrayVector.java

@@ -8,9 +8,13 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.DoubleArray;
 import org.elasticsearch.core.Releasable;
 
+import java.io.IOException;
+
 /**
  * Vector implementation that defers to an enclosed {@link DoubleArray}.
  * Does not take ownership of the array and does not adjust circuit breakers to account for it.
@@ -27,6 +31,26 @@ public final class DoubleBigArrayVector extends AbstractVector implements Double
         this.values = values;
     }
 
+    static DoubleBigArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
+        DoubleArray values = blockFactory.bigArrays().newDoubleArray(positions, false);
+        boolean success = false;
+        try {
+            values.fillWith(in);
+            DoubleBigArrayVector vector = new DoubleBigArrayVector(values, positions, blockFactory);
+            blockFactory.adjustBreaker(vector.ramBytesUsed() - RamUsageEstimator.sizeOf(values));
+            success = true;
+            return vector;
+        } finally {
+            if (success == false) {
+                values.close();
+            }
+        }
+    }
+
+    void writeArrayVector(int positions, StreamOutput out) throws IOException {
+        values.writeTo(out);
+    }
+
     @Override
     public DoubleBlock asBlock() {
         return new DoubleVectorBlock(this);

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVector.java

@@ -81,6 +81,7 @@ public sealed interface DoubleVector extends Vector permits ConstantDoubleVector
             case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
             case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantDoubleVector(in.readDouble(), positions);
             case SERIALIZE_VECTOR_ARRAY -> DoubleArrayVector.readArrayVector(positions, in, blockFactory);
+            case SERIALIZE_VECTOR_BIG_ARRAY -> DoubleBigArrayVector.readArrayVector(positions, in, blockFactory);
             default -> {
                 assert false : "invalid vector serialization type [" + serializationType + "]";
                 throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
@@ -99,6 +100,9 @@ public sealed interface DoubleVector extends Vector permits ConstantDoubleVector
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof DoubleArrayVector v) {
             out.writeByte(SERIALIZE_VECTOR_ARRAY);
             v.writeArrayVector(positions, out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_VECTOR) && this instanceof DoubleBigArrayVector v) {
+            out.writeByte(SERIALIZE_VECTOR_BIG_ARRAY);
+            v.writeArrayVector(positions, out);
         } else {
             out.writeByte(SERIALIZE_VECTOR_VALUES);
             writeValues(this, positions, out);

+ 24 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBigArrayVector.java

@@ -8,9 +8,13 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.IntArray;
 import org.elasticsearch.core.Releasable;
 
+import java.io.IOException;
+
 /**
  * Vector implementation that defers to an enclosed {@link IntArray}.
  * Does not take ownership of the array and does not adjust circuit breakers to account for it.
@@ -27,6 +31,26 @@ public final class IntBigArrayVector extends AbstractVector implements IntVector
         this.values = values;
     }
 
+    static IntBigArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
+        IntArray values = blockFactory.bigArrays().newIntArray(positions, false);
+        boolean success = false;
+        try {
+            values.fillWith(in);
+            IntBigArrayVector vector = new IntBigArrayVector(values, positions, blockFactory);
+            blockFactory.adjustBreaker(vector.ramBytesUsed() - RamUsageEstimator.sizeOf(values));
+            success = true;
+            return vector;
+        } finally {
+            if (success == false) {
+                values.close();
+            }
+        }
+    }
+
+    void writeArrayVector(int positions, StreamOutput out) throws IOException {
+        values.writeTo(out);
+    }
+
     @Override
     public IntBlock asBlock() {
         return new IntVectorBlock(this);

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVector.java

@@ -80,6 +80,7 @@ public sealed interface IntVector extends Vector permits ConstantIntVector, IntA
             case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
             case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantIntVector(in.readInt(), positions);
             case SERIALIZE_VECTOR_ARRAY -> IntArrayVector.readArrayVector(positions, in, blockFactory);
+            case SERIALIZE_VECTOR_BIG_ARRAY -> IntBigArrayVector.readArrayVector(positions, in, blockFactory);
             default -> {
                 assert false : "invalid vector serialization type [" + serializationType + "]";
                 throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
@@ -98,6 +99,9 @@ public sealed interface IntVector extends Vector permits ConstantIntVector, IntA
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof IntArrayVector v) {
             out.writeByte(SERIALIZE_VECTOR_ARRAY);
             v.writeArrayVector(positions, out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_VECTOR) && this instanceof IntBigArrayVector v) {
+            out.writeByte(SERIALIZE_VECTOR_BIG_ARRAY);
+            v.writeArrayVector(positions, out);
         } else {
             out.writeByte(SERIALIZE_VECTOR_VALUES);
             writeValues(this, positions, out);

+ 24 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBigArrayVector.java

@@ -8,9 +8,13 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.LongArray;
 import org.elasticsearch.core.Releasable;
 
+import java.io.IOException;
+
 /**
  * Vector implementation that defers to an enclosed {@link LongArray}.
  * Does not take ownership of the array and does not adjust circuit breakers to account for it.
@@ -27,6 +31,26 @@ public final class LongBigArrayVector extends AbstractVector implements LongVect
         this.values = values;
     }
 
+    static LongBigArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
+        LongArray values = blockFactory.bigArrays().newLongArray(positions, false);
+        boolean success = false;
+        try {
+            values.fillWith(in);
+            LongBigArrayVector vector = new LongBigArrayVector(values, positions, blockFactory);
+            blockFactory.adjustBreaker(vector.ramBytesUsed() - RamUsageEstimator.sizeOf(values));
+            success = true;
+            return vector;
+        } finally {
+            if (success == false) {
+                values.close();
+            }
+        }
+    }
+
+    void writeArrayVector(int positions, StreamOutput out) throws IOException {
+        values.writeTo(out);
+    }
+
     @Override
     public LongBlock asBlock() {
         return new LongVectorBlock(this);

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVector.java

@@ -81,6 +81,7 @@ public sealed interface LongVector extends Vector permits ConstantLongVector, Lo
             case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
             case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstantLongVector(in.readLong(), positions);
             case SERIALIZE_VECTOR_ARRAY -> LongArrayVector.readArrayVector(positions, in, blockFactory);
+            case SERIALIZE_VECTOR_BIG_ARRAY -> LongBigArrayVector.readArrayVector(positions, in, blockFactory);
             default -> {
                 assert false : "invalid vector serialization type [" + serializationType + "]";
                 throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
@@ -99,6 +100,9 @@ public sealed interface LongVector extends Vector permits ConstantLongVector, Lo
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof LongArrayVector v) {
             out.writeByte(SERIALIZE_VECTOR_ARRAY);
             v.writeArrayVector(positions, out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_VECTOR) && this instanceof LongBigArrayVector v) {
+            out.writeByte(SERIALIZE_VECTOR_BIG_ARRAY);
+            v.writeArrayVector(positions, out);
         } else {
             out.writeByte(SERIALIZE_VECTOR_VALUES);
             writeValues(this, positions, out);

+ 1 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Vector.java

@@ -82,4 +82,5 @@ public interface Vector extends Accountable, RefCounted, Releasable {
     byte SERIALIZE_VECTOR_VALUES = 0;
     byte SERIALIZE_VECTOR_CONSTANT = 1;
     byte SERIALIZE_VECTOR_ARRAY = 2;
+    byte SERIALIZE_VECTOR_BIG_ARRAY = 3;
 }

+ 30 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BigArrayVector.java.st

@@ -8,9 +8,13 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.$Array$;
 import org.elasticsearch.core.Releasable;
 
+import java.io.IOException;
+
 /**
  * Vector implementation that defers to an enclosed {@link $if(boolean)$Bit$else$$Type$$endif$Array}.
  * Does not take ownership of the array and does not adjust circuit breakers to account for it.
@@ -27,6 +31,32 @@ public final class $Type$BigArrayVector extends AbstractVector implements $Type$
         this.values = values;
     }
 
+    static $Type$BigArrayVector readArrayVector(int positions, StreamInput in, BlockFactory blockFactory) throws IOException {
+$if(boolean)$
+        $Array$ values = new BitArray(blockFactory.bigArrays(), true, in);
+$else$
+        $Array$ values = blockFactory.bigArrays().new$Type$Array(positions, false);
+$endif$
+        boolean success = false;
+        try {
+$if(boolean)$$else$
+            values.fillWith(in);
+$endif$
+            $Type$BigArrayVector vector = new $Type$BigArrayVector(values, positions, blockFactory);
+            blockFactory.adjustBreaker(vector.ramBytesUsed() - RamUsageEstimator.sizeOf(values));
+            success = true;
+            return vector;
+        } finally {
+            if (success == false) {
+                values.close();
+            }
+        }
+    }
+
+    void writeArrayVector(int positions, StreamOutput out) throws IOException {
+        values.writeTo(out);
+    }
+
     @Override
     public $Type$Block asBlock() {
         return new $Type$VectorBlock(this);

+ 8 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st

@@ -114,6 +114,9 @@ $endif$
             case SERIALIZE_VECTOR_VALUES -> readValues(positions, in, blockFactory);
             case SERIALIZE_VECTOR_CONSTANT -> blockFactory.newConstant$Type$Vector(in.read$Type$(), positions);
             case SERIALIZE_VECTOR_ARRAY -> $Type$ArrayVector.readArrayVector(positions, in, blockFactory);
+$if(BytesRef)$$else$
+            case SERIALIZE_VECTOR_BIG_ARRAY -> $Type$BigArrayVector.readArrayVector(positions, in, blockFactory);
+$endif$
             default -> {
                 assert false : "invalid vector serialization type [" + serializationType + "]";
                 throw new IllegalStateException("invalid vector serialization type [" + serializationType + "]");
@@ -136,6 +139,11 @@ $endif$
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_VECTOR) && this instanceof $Type$ArrayVector v) {
             out.writeByte(SERIALIZE_VECTOR_ARRAY);
             v.writeArrayVector(positions, out);
+$if(BytesRef)$$else$
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_VECTOR) && this instanceof $Type$BigArrayVector v) {
+            out.writeByte(SERIALIZE_VECTOR_BIG_ARRAY);
+            v.writeArrayVector(positions, out);
+$endif$
         } else {
             out.writeByte(SERIALIZE_VECTOR_VALUES);
             writeValues(this, positions, out);