Browse Source

Serialize big array blocks (#106373)

Similar to ArrayBlocks, this change serializes the underlying
structure of BigArrayBlocks.
Nhat Nguyen 1 year ago
parent
commit
b6f876f32a

+ 5 - 0
docs/changelog/106373.yaml

@@ -0,0 +1,5 @@
+pr: 106373
+summary: Serialize big array blocks
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -147,6 +147,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_EXTENDED_ENRICH_INPUT_TYPE = def(8_607_00_0);
     public static final TransportVersion ESQL_SERIALIZE_BIG_VECTOR = def(8_608_00_0);
     public static final TransportVersion AGGS_EXCLUDED_DELETED_DOCS = def(8_609_00_0);
+    public static final TransportVersion ESQL_SERIALIZE_BIG_ARRAY = def(8_610_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBigArrayBlock.java

@@ -8,9 +8,11 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.BitArray;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -54,6 +56,29 @@ public final class BooleanBigArrayBlock extends AbstractArrayBlock implements Bo
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static BooleanBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        BooleanBigArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = BooleanBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new BooleanBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public BooleanVector asVector() {
         return null;

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java

@@ -55,6 +55,7 @@ public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, Bo
             case SERIALIZE_BLOCK_VALUES -> BooleanBlock.readValues(in);
             case SERIALIZE_BLOCK_VECTOR -> BooleanVector.readFrom(in.blockFactory(), in).asBlock();
             case SERIALIZE_BLOCK_ARRAY -> BooleanArrayBlock.readArrayBlock(in.blockFactory(), in);
+            case SERIALIZE_BLOCK_BIG_ARRAY -> BooleanBigArrayBlock.readArrayBlock(in.blockFactory(), in);
             default -> {
                 assert false : "invalid block serialization type " + serializationType;
                 throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, Bo
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof BooleanArrayBlock b) {
             out.writeByte(SERIALIZE_BLOCK_ARRAY);
             b.writeArrayBlock(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof BooleanBigArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
+            b.writeArrayBlock(out);
         } else {
             out.writeByte(SERIALIZE_BLOCK_VALUES);
             BooleanBlock.writeValues(this, out);

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBigArrayBlock.java

@@ -8,9 +8,11 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.DoubleArray;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -54,6 +56,29 @@ public final class DoubleBigArrayBlock extends AbstractArrayBlock implements Dou
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static DoubleBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        DoubleBigArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = DoubleBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new DoubleBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public DoubleVector asVector() {
         return null;

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java

@@ -55,6 +55,7 @@ public sealed interface DoubleBlock extends Block permits DoubleArrayBlock, Doub
             case SERIALIZE_BLOCK_VALUES -> DoubleBlock.readValues(in);
             case SERIALIZE_BLOCK_VECTOR -> DoubleVector.readFrom(in.blockFactory(), in).asBlock();
             case SERIALIZE_BLOCK_ARRAY -> DoubleArrayBlock.readArrayBlock(in.blockFactory(), in);
+            case SERIALIZE_BLOCK_BIG_ARRAY -> DoubleBigArrayBlock.readArrayBlock(in.blockFactory(), in);
             default -> {
                 assert false : "invalid block serialization type " + serializationType;
                 throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ public sealed interface DoubleBlock extends Block permits DoubleArrayBlock, Doub
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof DoubleArrayBlock b) {
             out.writeByte(SERIALIZE_BLOCK_ARRAY);
             b.writeArrayBlock(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof DoubleBigArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
+            b.writeArrayBlock(out);
         } else {
             out.writeByte(SERIALIZE_BLOCK_VALUES);
             DoubleBlock.writeValues(this, out);

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBigArrayBlock.java

@@ -8,9 +8,11 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.IntArray;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -54,6 +56,29 @@ public final class IntBigArrayBlock extends AbstractArrayBlock implements IntBlo
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static IntBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        IntBigArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = IntBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new IntBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public IntVector asVector() {
         return null;

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java

@@ -55,6 +55,7 @@ public sealed interface IntBlock extends Block permits IntArrayBlock, IntVectorB
             case SERIALIZE_BLOCK_VALUES -> IntBlock.readValues(in);
             case SERIALIZE_BLOCK_VECTOR -> IntVector.readFrom(in.blockFactory(), in).asBlock();
             case SERIALIZE_BLOCK_ARRAY -> IntArrayBlock.readArrayBlock(in.blockFactory(), in);
+            case SERIALIZE_BLOCK_BIG_ARRAY -> IntBigArrayBlock.readArrayBlock(in.blockFactory(), in);
             default -> {
                 assert false : "invalid block serialization type " + serializationType;
                 throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ public sealed interface IntBlock extends Block permits IntArrayBlock, IntVectorB
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof IntArrayBlock b) {
             out.writeByte(SERIALIZE_BLOCK_ARRAY);
             b.writeArrayBlock(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof IntBigArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
+            b.writeArrayBlock(out);
         } else {
             out.writeByte(SERIALIZE_BLOCK_VALUES);
             IntBlock.writeValues(this, out);

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBigArrayBlock.java

@@ -8,9 +8,11 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.LongArray;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -54,6 +56,29 @@ public final class LongBigArrayBlock extends AbstractArrayBlock implements LongB
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static LongBigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        LongBigArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = LongBigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new LongBigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public LongVector asVector() {
         return null;

+ 4 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java

@@ -55,6 +55,7 @@ public sealed interface LongBlock extends Block permits LongArrayBlock, LongVect
             case SERIALIZE_BLOCK_VALUES -> LongBlock.readValues(in);
             case SERIALIZE_BLOCK_VECTOR -> LongVector.readFrom(in.blockFactory(), in).asBlock();
             case SERIALIZE_BLOCK_ARRAY -> LongArrayBlock.readArrayBlock(in.blockFactory(), in);
+            case SERIALIZE_BLOCK_BIG_ARRAY -> LongBigArrayBlock.readArrayBlock(in.blockFactory(), in);
             default -> {
                 assert false : "invalid block serialization type " + serializationType;
                 throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -91,6 +92,9 @@ public sealed interface LongBlock extends Block permits LongArrayBlock, LongVect
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof LongArrayBlock b) {
             out.writeByte(SERIALIZE_BLOCK_ARRAY);
             b.writeArrayBlock(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof LongBigArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
+            b.writeArrayBlock(out);
         } else {
             out.writeByte(SERIALIZE_BLOCK_VALUES);
             LongBlock.writeValues(this, out);

+ 1 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java

@@ -246,4 +246,5 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
     byte SERIALIZE_BLOCK_VALUES = 0;
     byte SERIALIZE_BLOCK_VECTOR = 1;
     byte SERIALIZE_BLOCK_ARRAY = 2;
+    byte SERIALIZE_BLOCK_BIG_ARRAY = 3;
 }

+ 25 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BigArrayBlock.java.st

@@ -8,9 +8,11 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.$Array$;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -54,6 +56,29 @@ public final class $Type$BigArrayBlock extends AbstractArrayBlock implements $Ty
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static $Type$BigArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        $Type$BigArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = $Type$BigArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new $Type$BigArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public $Type$Vector asVector() {
         return null;

+ 8 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st

@@ -70,6 +70,9 @@ $endif$
             case SERIALIZE_BLOCK_VALUES -> $Type$Block.readValues(in);
             case SERIALIZE_BLOCK_VECTOR -> $Type$Vector.readFrom(in.blockFactory(), in).asBlock();
             case SERIALIZE_BLOCK_ARRAY -> $Type$ArrayBlock.readArrayBlock(in.blockFactory(), in);
+$if(BytesRef)$$else$
+            case SERIALIZE_BLOCK_BIG_ARRAY -> $Type$BigArrayBlock.readArrayBlock(in.blockFactory(), in);
+$endif$
             default -> {
                 assert false : "invalid block serialization type " + serializationType;
                 throw new IllegalStateException("invalid serialization type " + serializationType);
@@ -106,6 +109,11 @@ $endif$
         } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof $Type$ArrayBlock b) {
             out.writeByte(SERIALIZE_BLOCK_ARRAY);
             b.writeArrayBlock(out);
+$if(BytesRef)$$else$
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_BIG_ARRAY) && this instanceof $Type$BigArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_BIG_ARRAY);
+            b.writeArrayBlock(out);
+$endif$
         } else {
             out.writeByte(SERIALIZE_BLOCK_VALUES);
             $Type$Block.writeValues(this, out);