Browse Source

Specialize serialization of array blocks (#106102)

A follow-up of #105893

Currently, we serialize blocks value by value, which is simple but 
effective. However, it would be more efficient to serialize the
underlying structures of array blocks instead.
Nhat Nguyen 1 year ago
parent
commit
721d9fadd7
16 changed files with 447 additions and 103 deletions
  1. 5 0
      docs/changelog/106102.yaml
  2. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  3. 25 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java
  4. 36 17
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java
  5. 25 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java
  6. 37 17
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java
  7. 25 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java
  8. 36 17
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java
  9. 25 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java
  10. 36 17
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java
  11. 25 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java
  12. 36 17
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java
  13. 62 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractArrayBlock.java
  14. 7 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java
  15. 26 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st
  16. 40 18
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st

+ 5 - 0
docs/changelog/106102.yaml

@@ -0,0 +1,5 @@
+pr: 106102
+summary: Specialize serialization of array blocks
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -139,6 +139,7 @@ public class TransportVersions {
     public static final TransportVersion ADD_FAILURE_STORE_INDICES_OPTIONS = def(8_599_00_0);
     public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0);
     public static final TransportVersion ESQL_SERIALIZE_ARRAY_VECTOR = def(8_601_00_0);
+    public static final TransportVersion ESQL_SERIALIZE_ARRAY_BLOCK = def(8_602_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java

@@ -8,8 +8,10 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -53,6 +55,29 @@ final class BooleanArrayBlock extends AbstractArrayBlock implements BooleanBlock
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static BooleanArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        BooleanArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = BooleanArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new BooleanArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public BooleanVector asVector() {
         return null;

+ 36 - 17
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.data;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -49,10 +50,19 @@ public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, Bo
     }
 
     private static BooleanBlock readFrom(BlockStreamInput in) throws IOException {
-        final boolean isVector = in.readBoolean();
-        if (isVector) {
-            return BooleanVector.readFrom(in.blockFactory(), in).asBlock();
-        }
+        final byte serializationType = in.readByte();
+        return switch (serializationType) {
+            case SERIALIZE_BLOCK_VALUES -> BooleanBlock.readValues(in);
+            case SERIALIZE_BLOCK_VECTOR -> BooleanVector.readFrom(in.blockFactory(), in).asBlock();
+            case SERIALIZE_BLOCK_ARRAY -> BooleanArrayBlock.readArrayBlock(in.blockFactory(), in);
+            default -> {
+                assert false : "invalid block serialization type " + serializationType;
+                throw new IllegalStateException("invalid serialization type " + serializationType);
+            }
+        };
+    }
+
+    private static BooleanBlock readValues(BlockStreamInput in) throws IOException {
         final int positions = in.readVInt();
         try (BooleanBlock.Builder builder = in.blockFactory().newBooleanBlockBuilder(positions)) {
             for (int i = 0; i < positions; i++) {
@@ -74,22 +84,31 @@ public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, Bo
     @Override
     default void writeTo(StreamOutput out) throws IOException {
         BooleanVector vector = asVector();
-        out.writeBoolean(vector != null);
+        final var version = out.getTransportVersion();
         if (vector != null) {
+            out.writeByte(SERIALIZE_BLOCK_VECTOR);
             vector.writeTo(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof BooleanArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_ARRAY);
+            b.writeArrayBlock(out);
         } else {
-            final int positions = getPositionCount();
-            out.writeVInt(positions);
-            for (int pos = 0; pos < positions; pos++) {
-                if (isNull(pos)) {
-                    out.writeBoolean(true);
-                } else {
-                    out.writeBoolean(false);
-                    final int valueCount = getValueCount(pos);
-                    out.writeVInt(valueCount);
-                    for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
-                        out.writeBoolean(getBoolean(getFirstValueIndex(pos) + valueIndex));
-                    }
+            out.writeByte(SERIALIZE_BLOCK_VALUES);
+            BooleanBlock.writeValues(this, out);
+        }
+    }
+
+    private static void writeValues(BooleanBlock block, StreamOutput out) throws IOException {
+        final int positions = block.getPositionCount();
+        out.writeVInt(positions);
+        for (int pos = 0; pos < positions; pos++) {
+            if (block.isNull(pos)) {
+                out.writeBoolean(true);
+            } else {
+                out.writeBoolean(false);
+                final int valueCount = block.getValueCount(pos);
+                out.writeVInt(valueCount);
+                for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+                    out.writeBoolean(block.getBoolean(block.getFirstValueIndex(pos) + valueIndex));
                 }
             }
         }

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java

@@ -9,9 +9,11 @@ package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.BytesRefArray;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -56,6 +58,29 @@ final class BytesRefArrayBlock extends AbstractArrayBlock implements BytesRefBlo
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static BytesRefArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        BytesRefArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = BytesRefArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new BytesRefArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public BytesRefVector asVector() {
         return null;

+ 37 - 17
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -53,10 +54,19 @@ public sealed interface BytesRefBlock extends Block permits BytesRefArrayBlock,
     }
 
     private static BytesRefBlock readFrom(BlockStreamInput in) throws IOException {
-        final boolean isVector = in.readBoolean();
-        if (isVector) {
-            return BytesRefVector.readFrom(in.blockFactory(), in).asBlock();
-        }
+        final byte serializationType = in.readByte();
+        return switch (serializationType) {
+            case SERIALIZE_BLOCK_VALUES -> BytesRefBlock.readValues(in);
+            case SERIALIZE_BLOCK_VECTOR -> BytesRefVector.readFrom(in.blockFactory(), in).asBlock();
+            case SERIALIZE_BLOCK_ARRAY -> BytesRefArrayBlock.readArrayBlock(in.blockFactory(), in);
+            default -> {
+                assert false : "invalid block serialization type " + serializationType;
+                throw new IllegalStateException("invalid serialization type " + serializationType);
+            }
+        };
+    }
+
+    private static BytesRefBlock readValues(BlockStreamInput in) throws IOException {
         final int positions = in.readVInt();
         try (BytesRefBlock.Builder builder = in.blockFactory().newBytesRefBlockBuilder(positions)) {
             for (int i = 0; i < positions; i++) {
@@ -78,22 +88,32 @@ public sealed interface BytesRefBlock extends Block permits BytesRefArrayBlock,
     @Override
     default void writeTo(StreamOutput out) throws IOException {
         BytesRefVector vector = asVector();
-        out.writeBoolean(vector != null);
+        final var version = out.getTransportVersion();
         if (vector != null) {
+            out.writeByte(SERIALIZE_BLOCK_VECTOR);
             vector.writeTo(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof BytesRefArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_ARRAY);
+            b.writeArrayBlock(out);
         } else {
-            final int positions = getPositionCount();
-            out.writeVInt(positions);
-            for (int pos = 0; pos < positions; pos++) {
-                if (isNull(pos)) {
-                    out.writeBoolean(true);
-                } else {
-                    out.writeBoolean(false);
-                    final int valueCount = getValueCount(pos);
-                    out.writeVInt(valueCount);
-                    for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
-                        out.writeBytesRef(getBytesRef(getFirstValueIndex(pos) + valueIndex, new BytesRef()));
-                    }
+            out.writeByte(SERIALIZE_BLOCK_VALUES);
+            BytesRefBlock.writeValues(this, out);
+        }
+    }
+
+    private static void writeValues(BytesRefBlock block, StreamOutput out) throws IOException {
+        final int positions = block.getPositionCount();
+        out.writeVInt(positions);
+        for (int pos = 0; pos < positions; pos++) {
+            if (block.isNull(pos)) {
+                out.writeBoolean(true);
+            } else {
+                out.writeBoolean(false);
+                final int valueCount = block.getValueCount(pos);
+                out.writeVInt(valueCount);
+                var scratch = new BytesRef();
+                for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+                    out.writeBytesRef(block.getBytesRef(block.getFirstValueIndex(pos) + valueIndex, scratch));
                 }
             }
         }

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java

@@ -8,8 +8,10 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -53,6 +55,29 @@ final class DoubleArrayBlock extends AbstractArrayBlock implements DoubleBlock {
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static DoubleArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        DoubleArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = DoubleArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new DoubleArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public DoubleVector asVector() {
         return null;

+ 36 - 17
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.data;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -49,10 +50,19 @@ public sealed interface DoubleBlock extends Block permits DoubleArrayBlock, Doub
     }
 
     private static DoubleBlock readFrom(BlockStreamInput in) throws IOException {
-        final boolean isVector = in.readBoolean();
-        if (isVector) {
-            return DoubleVector.readFrom(in.blockFactory(), in).asBlock();
-        }
+        final byte serializationType = in.readByte();
+        return switch (serializationType) {
+            case SERIALIZE_BLOCK_VALUES -> DoubleBlock.readValues(in);
+            case SERIALIZE_BLOCK_VECTOR -> DoubleVector.readFrom(in.blockFactory(), in).asBlock();
+            case SERIALIZE_BLOCK_ARRAY -> DoubleArrayBlock.readArrayBlock(in.blockFactory(), in);
+            default -> {
+                assert false : "invalid block serialization type " + serializationType;
+                throw new IllegalStateException("invalid serialization type " + serializationType);
+            }
+        };
+    }
+
+    private static DoubleBlock readValues(BlockStreamInput in) throws IOException {
         final int positions = in.readVInt();
         try (DoubleBlock.Builder builder = in.blockFactory().newDoubleBlockBuilder(positions)) {
             for (int i = 0; i < positions; i++) {
@@ -74,22 +84,31 @@ public sealed interface DoubleBlock extends Block permits DoubleArrayBlock, Doub
     @Override
     default void writeTo(StreamOutput out) throws IOException {
         DoubleVector vector = asVector();
-        out.writeBoolean(vector != null);
+        final var version = out.getTransportVersion();
         if (vector != null) {
+            out.writeByte(SERIALIZE_BLOCK_VECTOR);
             vector.writeTo(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof DoubleArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_ARRAY);
+            b.writeArrayBlock(out);
         } else {
-            final int positions = getPositionCount();
-            out.writeVInt(positions);
-            for (int pos = 0; pos < positions; pos++) {
-                if (isNull(pos)) {
-                    out.writeBoolean(true);
-                } else {
-                    out.writeBoolean(false);
-                    final int valueCount = getValueCount(pos);
-                    out.writeVInt(valueCount);
-                    for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
-                        out.writeDouble(getDouble(getFirstValueIndex(pos) + valueIndex));
-                    }
+            out.writeByte(SERIALIZE_BLOCK_VALUES);
+            DoubleBlock.writeValues(this, out);
+        }
+    }
+
+    private static void writeValues(DoubleBlock block, StreamOutput out) throws IOException {
+        final int positions = block.getPositionCount();
+        out.writeVInt(positions);
+        for (int pos = 0; pos < positions; pos++) {
+            if (block.isNull(pos)) {
+                out.writeBoolean(true);
+            } else {
+                out.writeBoolean(false);
+                final int valueCount = block.getValueCount(pos);
+                out.writeVInt(valueCount);
+                for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+                    out.writeDouble(block.getDouble(block.getFirstValueIndex(pos) + valueIndex));
                 }
             }
         }

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java

@@ -8,8 +8,10 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -53,6 +55,29 @@ final class IntArrayBlock extends AbstractArrayBlock implements IntBlock {
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static IntArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        IntArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = IntArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new IntArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public IntVector asVector() {
         return null;

+ 36 - 17
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.data;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -49,10 +50,19 @@ public sealed interface IntBlock extends Block permits IntArrayBlock, IntVectorB
     }
 
     private static IntBlock readFrom(BlockStreamInput in) throws IOException {
-        final boolean isVector = in.readBoolean();
-        if (isVector) {
-            return IntVector.readFrom(in.blockFactory(), in).asBlock();
-        }
+        final byte serializationType = in.readByte();
+        return switch (serializationType) {
+            case SERIALIZE_BLOCK_VALUES -> IntBlock.readValues(in);
+            case SERIALIZE_BLOCK_VECTOR -> IntVector.readFrom(in.blockFactory(), in).asBlock();
+            case SERIALIZE_BLOCK_ARRAY -> IntArrayBlock.readArrayBlock(in.blockFactory(), in);
+            default -> {
+                assert false : "invalid block serialization type " + serializationType;
+                throw new IllegalStateException("invalid serialization type " + serializationType);
+            }
+        };
+    }
+
+    private static IntBlock readValues(BlockStreamInput in) throws IOException {
         final int positions = in.readVInt();
         try (IntBlock.Builder builder = in.blockFactory().newIntBlockBuilder(positions)) {
             for (int i = 0; i < positions; i++) {
@@ -74,22 +84,31 @@ public sealed interface IntBlock extends Block permits IntArrayBlock, IntVectorB
     @Override
     default void writeTo(StreamOutput out) throws IOException {
         IntVector vector = asVector();
-        out.writeBoolean(vector != null);
+        final var version = out.getTransportVersion();
         if (vector != null) {
+            out.writeByte(SERIALIZE_BLOCK_VECTOR);
             vector.writeTo(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof IntArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_ARRAY);
+            b.writeArrayBlock(out);
         } else {
-            final int positions = getPositionCount();
-            out.writeVInt(positions);
-            for (int pos = 0; pos < positions; pos++) {
-                if (isNull(pos)) {
-                    out.writeBoolean(true);
-                } else {
-                    out.writeBoolean(false);
-                    final int valueCount = getValueCount(pos);
-                    out.writeVInt(valueCount);
-                    for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
-                        out.writeInt(getInt(getFirstValueIndex(pos) + valueIndex));
-                    }
+            out.writeByte(SERIALIZE_BLOCK_VALUES);
+            IntBlock.writeValues(this, out);
+        }
+    }
+
+    private static void writeValues(IntBlock block, StreamOutput out) throws IOException {
+        final int positions = block.getPositionCount();
+        out.writeVInt(positions);
+        for (int pos = 0; pos < positions; pos++) {
+            if (block.isNull(pos)) {
+                out.writeBoolean(true);
+            } else {
+                out.writeBoolean(false);
+                final int valueCount = block.getValueCount(pos);
+                out.writeVInt(valueCount);
+                for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+                    out.writeInt(block.getInt(block.getFirstValueIndex(pos) + valueIndex));
                 }
             }
         }

+ 25 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java

@@ -8,8 +8,10 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Releasables;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -53,6 +55,29 @@ final class LongArrayBlock extends AbstractArrayBlock implements LongBlock {
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static LongArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        LongArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = LongArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new LongArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public LongVector asVector() {
         return null;

+ 36 - 17
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.compute.data;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -49,10 +50,19 @@ public sealed interface LongBlock extends Block permits LongArrayBlock, LongVect
     }
 
     private static LongBlock readFrom(BlockStreamInput in) throws IOException {
-        final boolean isVector = in.readBoolean();
-        if (isVector) {
-            return LongVector.readFrom(in.blockFactory(), in).asBlock();
-        }
+        final byte serializationType = in.readByte();
+        return switch (serializationType) {
+            case SERIALIZE_BLOCK_VALUES -> LongBlock.readValues(in);
+            case SERIALIZE_BLOCK_VECTOR -> LongVector.readFrom(in.blockFactory(), in).asBlock();
+            case SERIALIZE_BLOCK_ARRAY -> LongArrayBlock.readArrayBlock(in.blockFactory(), in);
+            default -> {
+                assert false : "invalid block serialization type " + serializationType;
+                throw new IllegalStateException("invalid serialization type " + serializationType);
+            }
+        };
+    }
+
+    private static LongBlock readValues(BlockStreamInput in) throws IOException {
         final int positions = in.readVInt();
         try (LongBlock.Builder builder = in.blockFactory().newLongBlockBuilder(positions)) {
             for (int i = 0; i < positions; i++) {
@@ -74,22 +84,31 @@ public sealed interface LongBlock extends Block permits LongArrayBlock, LongVect
     @Override
     default void writeTo(StreamOutput out) throws IOException {
         LongVector vector = asVector();
-        out.writeBoolean(vector != null);
+        final var version = out.getTransportVersion();
         if (vector != null) {
+            out.writeByte(SERIALIZE_BLOCK_VECTOR);
             vector.writeTo(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof LongArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_ARRAY);
+            b.writeArrayBlock(out);
         } else {
-            final int positions = getPositionCount();
-            out.writeVInt(positions);
-            for (int pos = 0; pos < positions; pos++) {
-                if (isNull(pos)) {
-                    out.writeBoolean(true);
-                } else {
-                    out.writeBoolean(false);
-                    final int valueCount = getValueCount(pos);
-                    out.writeVInt(valueCount);
-                    for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
-                        out.writeLong(getLong(getFirstValueIndex(pos) + valueIndex));
-                    }
+            out.writeByte(SERIALIZE_BLOCK_VALUES);
+            LongBlock.writeValues(this, out);
+        }
+    }
+
+    private static void writeValues(LongBlock block, StreamOutput out) throws IOException {
+        final int positions = block.getPositionCount();
+        out.writeVInt(positions);
+        for (int pos = 0; pos < positions; pos++) {
+            if (block.isNull(pos)) {
+                out.writeBoolean(true);
+            } else {
+                out.writeBoolean(false);
+                final int valueCount = block.getValueCount(pos);
+                out.writeVInt(valueCount);
+                for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+                    out.writeLong(block.getLong(block.getFirstValueIndex(pos) + valueIndex));
                 }
             }
         }

+ 62 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AbstractArrayBlock.java

@@ -7,8 +7,11 @@
 
 package org.elasticsearch.compute.data;
 
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Nullable;
 
+import java.io.IOException;
 import java.util.BitSet;
 
 abstract class AbstractArrayBlock extends AbstractNonThreadSafeRefCounted implements Block {
@@ -118,4 +121,63 @@ abstract class AbstractArrayBlock extends AbstractNonThreadSafeRefCounted implem
     public final boolean areAllValuesNull() {
         return nullValuesCount() == getPositionCount();
     }
+
+    static final class SubFields {
+        long bytesReserved = 0;
+        final int positionCount;
+        final int[] firstValueIndexes;
+        final BitSet nullsMask;
+        final MvOrdering mvOrdering;
+
+        SubFields(BlockFactory blockFactory, StreamInput in) throws IOException {
+            this.positionCount = in.readVInt();
+            boolean success = false;
+            try {
+                if (in.readBoolean()) {
+                    bytesReserved += blockFactory.preAdjustBreakerForInt(positionCount + 1);
+                    final int[] values = new int[positionCount + 1];
+                    values[0] = in.readVInt();
+                    for (int i = 1; i <= positionCount; i++) {
+                        values[i] = values[i - 1] + in.readVInt();
+                    }
+                    this.firstValueIndexes = values;
+                } else {
+                    this.firstValueIndexes = null;
+                }
+                if (in.readBoolean()) {
+                    bytesReserved += blockFactory.preAdjustBreakerForLong(positionCount / Long.BYTES);
+                    nullsMask = BitSet.valueOf(in.readLongArray());
+                } else {
+                    nullsMask = null;
+                }
+                this.mvOrdering = in.readEnum(MvOrdering.class);
+                success = true;
+            } finally {
+                if (success == false) {
+                    blockFactory.adjustBreaker(-bytesReserved);
+                }
+            }
+        }
+
+        int vectorPositions() {
+            return firstValueIndexes == null ? positionCount : firstValueIndexes[positionCount];
+        }
+    }
+
+    void writeSubFields(StreamOutput out) throws IOException {
+        out.writeVInt(positionCount);
+        out.writeBoolean(firstValueIndexes != null);
+        if (firstValueIndexes != null) {
+            // firstValueIndexes are monotonic increasing
+            out.writeVInt(firstValueIndexes[0]);
+            for (int i = 1; i <= positionCount; i++) {
+                out.writeVInt(firstValueIndexes[i] - firstValueIndexes[i - 1]);
+            }
+        }
+        out.writeBoolean(nullsMask != null);
+        if (nullsMask != null) {
+            out.writeLongArray(nullsMask.toLongArray());
+        }
+        out.writeEnum(mvOrdering);
+    }
 }

+ 7 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java

@@ -239,4 +239,11 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
             ConstantNullBlock.ENTRY
         );
     }
+
+    /**
+     * Serialization type for blocks: 0 and 1 replace false/true used in pre-8.14
+     */
+    byte SERIALIZE_BLOCK_VALUES = 0;
+    byte SERIALIZE_BLOCK_VECTOR = 1;
+    byte SERIALIZE_BLOCK_ARRAY = 2;
 }

+ 26 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st

@@ -10,14 +10,17 @@ package org.elasticsearch.compute.data;
 $if(BytesRef)$
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.BytesRefArray;
 import org.elasticsearch.core.Releasables;
 
 $else$
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Releasables;
 
 $endif$
+import java.io.IOException;
 import java.util.BitSet;
 
 /**
@@ -64,6 +67,29 @@ final class $Type$ArrayBlock extends AbstractArrayBlock implements $Type$Block {
             : firstValueIndexes[getPositionCount()] == vector.getPositionCount();
     }
 
+    static $Type$ArrayBlock readArrayBlock(BlockFactory blockFactory, BlockStreamInput in) throws IOException {
+        final SubFields sub = new SubFields(blockFactory, in);
+        $Type$ArrayVector vector = null;
+        boolean success = false;
+        try {
+            vector = $Type$ArrayVector.readArrayVector(sub.vectorPositions(), in, blockFactory);
+            var block = new $Type$ArrayBlock(vector, sub.positionCount, sub.firstValueIndexes, sub.nullsMask, sub.mvOrdering);
+            blockFactory.adjustBreaker(block.ramBytesUsed() - vector.ramBytesUsed() - sub.bytesReserved);
+            success = true;
+            return block;
+        } finally {
+            if (success == false) {
+                Releasables.close(vector);
+                blockFactory.adjustBreaker(-sub.bytesReserved);
+            }
+        }
+    }
+
+    void writeArrayBlock(StreamOutput out) throws IOException {
+        writeSubFields(out);
+        vector.writeArrayVector(vector.getPositionCount(), out);
+    }
+
     @Override
     public $Type$Vector asVector() {
         return null;

+ 40 - 18
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st

@@ -10,6 +10,7 @@ package org.elasticsearch.compute.data;
 $if(BytesRef)$
 import org.apache.lucene.util.BytesRef;
 $endif$
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -64,10 +65,19 @@ $endif$
     }
 
     private static $Type$Block readFrom(BlockStreamInput in) throws IOException {
-        final boolean isVector = in.readBoolean();
-        if (isVector) {
-            return $Type$Vector.readFrom(in.blockFactory(), in).asBlock();
-        }
+        final byte serializationType = in.readByte();
+        return switch (serializationType) {
+            case SERIALIZE_BLOCK_VALUES -> $Type$Block.readValues(in);
+            case SERIALIZE_BLOCK_VECTOR -> $Type$Vector.readFrom(in.blockFactory(), in).asBlock();
+            case SERIALIZE_BLOCK_ARRAY -> $Type$ArrayBlock.readArrayBlock(in.blockFactory(), in);
+            default -> {
+                assert false : "invalid block serialization type " + serializationType;
+                throw new IllegalStateException("invalid serialization type " + serializationType);
+            }
+        };
+    }
+
+    private static $Type$Block readValues(BlockStreamInput in) throws IOException {
         final int positions = in.readVInt();
         try ($Type$Block.Builder builder = in.blockFactory().new$Type$BlockBuilder(positions)) {
             for (int i = 0; i < positions; i++) {
@@ -89,26 +99,38 @@ $endif$
     @Override
     default void writeTo(StreamOutput out) throws IOException {
         $Type$Vector vector = asVector();
-        out.writeBoolean(vector != null);
+        final var version = out.getTransportVersion();
         if (vector != null) {
+            out.writeByte(SERIALIZE_BLOCK_VECTOR);
             vector.writeTo(out);
+        } else if (version.onOrAfter(TransportVersions.ESQL_SERIALIZE_ARRAY_BLOCK) && this instanceof $Type$ArrayBlock b) {
+            out.writeByte(SERIALIZE_BLOCK_ARRAY);
+            b.writeArrayBlock(out);
         } else {
-            final int positions = getPositionCount();
-            out.writeVInt(positions);
-            for (int pos = 0; pos < positions; pos++) {
-                if (isNull(pos)) {
-                    out.writeBoolean(true);
-                } else {
-                    out.writeBoolean(false);
-                    final int valueCount = getValueCount(pos);
-                    out.writeVInt(valueCount);
-                    for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+            out.writeByte(SERIALIZE_BLOCK_VALUES);
+            $Type$Block.writeValues(this, out);
+        }
+    }
+
+    private static void writeValues($Type$Block block, StreamOutput out) throws IOException {
+        final int positions = block.getPositionCount();
+        out.writeVInt(positions);
+        for (int pos = 0; pos < positions; pos++) {
+            if (block.isNull(pos)) {
+                out.writeBoolean(true);
+            } else {
+                out.writeBoolean(false);
+                final int valueCount = block.getValueCount(pos);
+                out.writeVInt(valueCount);
 $if(BytesRef)$
-                        out.write$Type$(get$Type$(getFirstValueIndex(pos) + valueIndex, new BytesRef()));
+                var scratch = new BytesRef();
+$endif$
+                for (int valueIndex = 0; valueIndex < valueCount; valueIndex++) {
+$if(BytesRef)$
+                    out.write$Type$(block.get$Type$(block.getFirstValueIndex(pos) + valueIndex, scratch));
 $else$
-                        out.write$Type$(get$Type$(getFirstValueIndex(pos) + valueIndex));
+                    out.write$Type$(block.get$Type$(block.getFirstValueIndex(pos) + valueIndex));
 $endif$
-                    }
                 }
             }
         }