Prechádzať zdrojové kódy

ESQL: Split large pages on load sometimes (#131053) (#132036)

This adds support for splitting `Page`s of large values when loading
from single segment, non-descending hits. This is hottest code path as
it's how we load data for aggregation. So! We had to make very very very
sure this doesn't slow down the fast path of loading doc values.

Caveat - this only defends against loading large values via the
row-by-row load mechanism that we use for stored fields and _source.
That covers the most common kinds of large values - mostly `text` and
geo fields. If we need to split further on docs values, we'll have to
invent something for them specifically. For now, just row-by-row.

This works by flipping the order in which we load row-by-row and
column-at-a-time values. Previously we loaded all column-at-a-time
values first because that was simpler. Then we loaded all of the
row-by-row values. Now we save the column-at-a-time values and instead
load row-by-row until the `Page`'s estimated size is larger than a "jumbo"
size which defaults to a megabyte.

Once we load enough rows that we estimate the page is "jumbo", we then
stop loading rows. The Page will look like this:

```
| txt1 | int | txt2 | long | double |
|------|-----|------|------|--------|
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        | <-- after loading this row
|      |     |      |      |        |     we crossed to "jumbo" size
|      |     |      |      |        |
|      |     |      |      |        |
|      |     |      |      |        | <-- these rows are entirely empty
|      |     |      |      |        |
|      |     |      |      |        |
```

Then we chop the page to the last row:
```
| txt1 | int | txt2 | long | double |
|------|-----|------|------|--------|
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
| XXXX |     | XXXX |      |        |
```

Then fill in the column-at-a-time columns:
```
| txt1 | int | txt2 | long | double |
|------|-----|------|------|--------|
| XXXX |   1 | XXXX |   11 |    1.0 |
| XXXX |   2 | XXXX |   22 |   -2.0 |
| XXXX |   3 | XXXX |   33 |    1e9 |
| XXXX |   4 | XXXX |   44 |    913 |
| XXXX |   5 | XXXX |   55 | 0.1234 |
| XXXX |   6 | XXXX |   66 | 3.1415 |
```

And then we return *that* `Page`. On the next `Driver` iteration we
start from where we left off.
Nik Everett 2 mesiacov pred
rodič
commit
6f2578ed6e
48 zmenil súbory, kde vykonal 732 pridanie a 234 odobranie
  1. 2 3
      benchmarks/README.md
  2. 7 1
      benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java
  3. 5 0
      docs/changelog/131053.yaml
  4. 3 3
      server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java
  5. 43 43
      server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java
  6. 9 9
      server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java
  7. 3 3
      server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java
  8. 3 3
      server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java
  9. 3 3
      server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java
  10. 3 3
      server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java
  11. 3 3
      server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java
  12. 3 3
      server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java
  13. 1 1
      server/src/test/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapperTests.java
  14. 1 1
      server/src/test/java/org/elasticsearch/index/mapper/BlockSourceReaderTests.java
  15. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java
  16. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java
  17. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java
  18. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java
  19. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java
  20. 2 1
      server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java
  21. 1 0
      test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/Clusters.java
  22. 87 4
      test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java
  23. 3 4
      test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java
  24. 36 7
      test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java
  25. 80 15
      test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java
  26. 0 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java
  27. 5 7
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java
  28. 33 4
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java
  29. 76 37
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java
  30. 0 3
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java
  31. 69 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReaderDocs.java
  32. 26 5
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java
  33. 1 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java
  34. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java
  35. 2 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java
  36. 23 10
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java
  37. 67 10
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java
  38. 2 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java
  39. 1 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
  40. 12 10
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
  41. 64 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java
  42. 4 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  43. 3 10
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  44. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java
  45. 8 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
  46. 7 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java
  47. 15 11
      x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java
  48. 2 2
      x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java

+ 2 - 3
benchmarks/README.md

@@ -152,11 +152,10 @@ exit
 Grab the async profiler from https://github.com/jvm-profiling-tools/async-profiler
 and run `prof async` like so:
 ```
-gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/async-profiler-3.0-29ee888-linux-x64/lib/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"'
+gradlew -p benchmarks/ run --args 'LongKeyedBucketOrdsBenchmark.multiBucket -prof "async:libPath=/home/nik9000/Downloads/async-profiler-4.0-linux-x64/lib/libasyncProfiler.so;dir=/tmp/prof;output=flamegraph"'
 ```
 
-Note: As of January 2025 the latest release of async profiler doesn't work
-      with our JDK but the nightly is fine.
+Note: As of July 2025 the 4.0 release of the async profiler works well.
 
 If you are on Mac, this'll warn you that you downloaded the shared library from
 the internet. You'll need to go to settings and allow it to run.

+ 7 - 1
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

@@ -24,8 +24,10 @@ import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.NumericUtils;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.common.logging.LogConfigurator;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BytesRefBlock;
@@ -84,10 +86,13 @@ import java.util.stream.IntStream;
 @State(Scope.Thread)
 @Fork(1)
 public class ValuesSourceReaderBenchmark {
+    static {
+        LogConfigurator.configureESLogging();
+    }
+
     private static final int BLOCK_LENGTH = 16 * 1024;
     private static final int INDEX_SIZE = 10 * BLOCK_LENGTH;
     private static final int COMMIT_INTERVAL = 500;
-    private static final BigArrays BIG_ARRAYS = BigArrays.NON_RECYCLING_INSTANCE;
     private static final BlockFactory blockFactory = BlockFactory.getInstance(
         new NoopCircuitBreaker("noop"),
         BigArrays.NON_RECYCLING_INSTANCE
@@ -296,6 +301,7 @@ public class ValuesSourceReaderBenchmark {
     public void benchmark() {
         ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
             blockFactory,
+            ByteSizeValue.ofMb(1).getBytes(),
             fields(name),
             List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
                 throw new UnsupportedOperationException("can't load _source here");

+ 5 - 0
docs/changelog/131053.yaml

@@ -0,0 +1,5 @@
+pr: 131053
+summary: Split large pages on load sometimes
+area: ES|QL
+type: bug
+issues: []

+ 3 - 3
server/src/main/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapper.java

@@ -98,11 +98,11 @@ public abstract class AbstractShapeGeometryFieldMapper<T> extends AbstractGeomet
             public BlockLoader.AllReader reader(LeafReaderContext context) throws IOException {
                 return new BlockLoader.AllReader() {
                     @Override
-                    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException {
+                    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
                         var binaryDocValues = context.reader().getBinaryDocValues(fieldName);
                         var reader = new GeometryDocValueReader();
-                        try (var builder = factory.ints(docs.count())) {
-                            for (int i = 0; i < docs.count(); i++) {
+                        try (var builder = factory.ints(docs.count() - offset)) {
+                            for (int i = offset; i < docs.count(); i++) {
                                 read(binaryDocValues, docs.get(i), reader, builder);
                             }
                             return builder.build();

+ 43 - 43
server/src/main/java/org/elasticsearch/index/mapper/BlockDocValuesReader.java

@@ -123,10 +123,10 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count())) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) {
                 int lastDoc = -1;
-                for (int i = 0; i < docs.count(); i++) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < lastDoc) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -172,9 +172,9 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count())) {
-                for (int i = 0; i < docs.count(); i++) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.LongBuilder builder = factory.longsFromDocValues(docs.count() - offset)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < this.docID) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -258,10 +258,10 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count())) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count() - offset)) {
                 int lastDoc = -1;
-                for (int i = 0; i < docs.count(); i++) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < lastDoc) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -307,9 +307,9 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count())) {
-                for (int i = 0; i < docs.count(); i++) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.IntBuilder builder = factory.intsFromDocValues(docs.count() - offset)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < this.docID) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -407,10 +407,10 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count())) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count() - offset)) {
                 int lastDoc = -1;
-                for (int i = 0; i < docs.count(); i++) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < lastDoc) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -460,9 +460,9 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count())) {
-                for (int i = 0; i < docs.count(); i++) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.DoubleBuilder builder = factory.doublesFromDocValues(docs.count() - offset)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < this.docID) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -541,10 +541,10 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
             // Doubles from doc values ensures that the values are in order
-            try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count(), dimensions)) {
-                for (int i = 0; i < docs.count(); i++) {
+            try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count() - offset, dimensions)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < floatVectorValues.docID()) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -642,19 +642,19 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
             if (ordinals.advanceExact(docId)) {
                 BytesRef v = ordinals.lookupOrd(ordinals.ordValue());
                 // the returned BytesRef can be reused
-                return factory.constantBytes(BytesRef.deepCopyOf(v));
+                return factory.constantBytes(BytesRef.deepCopyOf(v), 1);
             } else {
-                return factory.constantNulls();
+                return factory.constantNulls(1);
             }
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            if (docs.count() == 1) {
-                return readSingleDoc(factory, docs.get(0));
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            if (docs.count() - offset == 1) {
+                return readSingleDoc(factory, docs.get(offset));
             }
-            try (BlockLoader.SingletonOrdinalsBuilder builder = factory.singletonOrdinalsBuilder(ordinals, docs.count())) {
-                for (int i = 0; i < docs.count(); i++) {
+            try (var builder = factory.singletonOrdinalsBuilder(ordinals, docs.count() - offset)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < ordinals.docID()) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -697,9 +697,9 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BytesRefBuilder builder = factory.bytesRefsFromDocValues(docs.count())) {
-                for (int i = 0; i < docs.count(); i++) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BytesRefBuilder builder = factory.bytesRefsFromDocValues(docs.count() - offset)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < ordinals.docID()) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -777,9 +777,9 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count())) {
-                for (int i = 0; i < docs.count(); i++) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count() - offset)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < docID) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -876,9 +876,9 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count(), dimensions)) {
-                for (int i = 0; i < docs.count(); i++) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.FloatBuilder builder = factory.denseVectors(docs.count() - offset, dimensions)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < docID) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -960,10 +960,10 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count())) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count() - offset)) {
                 int lastDoc = -1;
-                for (int i = 0; i < docs.count(); i++) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < lastDoc) {
                         throw new IllegalStateException("docs within same block must be in order");
@@ -1009,9 +1009,9 @@ public abstract class BlockDocValuesReader implements BlockLoader.AllReader {
         }
 
         @Override
-        public BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException {
-            try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count())) {
-                for (int i = 0; i < docs.count(); i++) {
+        public BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            try (BlockLoader.BooleanBuilder builder = factory.booleansFromDocValues(docs.count() - offset)) {
+                for (int i = offset; i < docs.count(); i++) {
                     int doc = docs.get(i);
                     if (doc < this.docID) {
                         throw new IllegalStateException("docs within same block must be in order");

+ 9 - 9
server/src/main/java/org/elasticsearch/index/mapper/BlockLoader.java

@@ -43,7 +43,7 @@ public interface BlockLoader {
         /**
          * Reads the values of all documents in {@code docs}.
          */
-        BlockLoader.Block read(BlockFactory factory, Docs docs) throws IOException;
+        BlockLoader.Block read(BlockFactory factory, Docs docs, int offset) throws IOException;
     }
 
     interface RowStrideReader extends Reader {
@@ -149,8 +149,8 @@ public interface BlockLoader {
      */
     class ConstantNullsReader implements AllReader {
         @Override
-        public Block read(BlockFactory factory, Docs docs) throws IOException {
-            return factory.constantNulls();
+        public Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+            return factory.constantNulls(docs.count() - offset);
         }
 
         @Override
@@ -183,8 +183,8 @@ public interface BlockLoader {
             public ColumnAtATimeReader columnAtATimeReader(LeafReaderContext context) {
                 return new ColumnAtATimeReader() {
                     @Override
-                    public Block read(BlockFactory factory, Docs docs) {
-                        return factory.constantBytes(value);
+                    public Block read(BlockFactory factory, Docs docs, int offset) {
+                        return factory.constantBytes(value, docs.count() - offset);
                     }
 
                     @Override
@@ -261,8 +261,8 @@ public interface BlockLoader {
             }
             return new ColumnAtATimeReader() {
                 @Override
-                public Block read(BlockFactory factory, Docs docs) throws IOException {
-                    return reader.read(factory, docs);
+                public Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+                    return reader.read(factory, docs, offset);
                 }
 
                 @Override
@@ -408,13 +408,13 @@ public interface BlockLoader {
         /**
          * Build a block that contains only {@code null}.
          */
-        Block constantNulls();
+        Block constantNulls(int count);
 
         /**
          * Build a block that contains {@code value} repeated
          * {@code size} times.
          */
-        Block constantBytes(BytesRef value);
+        Block constantBytes(BytesRef value, int count);
 
         /**
          * Build a reader for reading keyword ordinals.

+ 3 - 3
server/src/main/java/org/elasticsearch/index/mapper/BooleanScriptBlockDocValuesReader.java

@@ -49,10 +49,10 @@ public class BooleanScriptBlockDocValuesReader extends BlockDocValuesReader {
     }
 
     @Override
-    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException {
+    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
         // Note that we don't emit falses before trues so we conform to the doc values contract and can use booleansFromDocValues
-        try (BlockLoader.BooleanBuilder builder = factory.booleans(docs.count())) {
-            for (int i = 0; i < docs.count(); i++) {
+        try (BlockLoader.BooleanBuilder builder = factory.booleans(docs.count() - offset)) {
+            for (int i = offset; i < docs.count(); i++) {
                 read(docs.get(i), builder);
             }
             return builder.build();

+ 3 - 3
server/src/main/java/org/elasticsearch/index/mapper/DateScriptBlockDocValuesReader.java

@@ -49,10 +49,10 @@ public class DateScriptBlockDocValuesReader extends BlockDocValuesReader {
     }
 
     @Override
-    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException {
+    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
         // Note that we don't sort the values sort, so we can't use factory.longsFromDocValues
-        try (BlockLoader.LongBuilder builder = factory.longs(docs.count())) {
-            for (int i = 0; i < docs.count(); i++) {
+        try (BlockLoader.LongBuilder builder = factory.longs(docs.count() - offset)) {
+            for (int i = offset; i < docs.count(); i++) {
                 read(docs.get(i), builder);
             }
             return builder.build();

+ 3 - 3
server/src/main/java/org/elasticsearch/index/mapper/DoubleScriptBlockDocValuesReader.java

@@ -49,10 +49,10 @@ public class DoubleScriptBlockDocValuesReader extends BlockDocValuesReader {
     }
 
     @Override
-    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException {
+    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
         // Note that we don't sort the values sort, so we can't use factory.doublesFromDocValues
-        try (BlockLoader.DoubleBuilder builder = factory.doubles(docs.count())) {
-            for (int i = 0; i < docs.count(); i++) {
+        try (BlockLoader.DoubleBuilder builder = factory.doubles(docs.count() - offset)) {
+            for (int i = offset; i < docs.count(); i++) {
                 read(docs.get(i), builder);
             }
             return builder.build();

+ 3 - 3
server/src/main/java/org/elasticsearch/index/mapper/IpScriptBlockDocValuesReader.java

@@ -49,10 +49,10 @@ public class IpScriptBlockDocValuesReader extends BlockDocValuesReader {
     }
 
     @Override
-    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException {
+    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
         // Note that we don't pre-sort our output so we can't use bytesRefsFromDocValues
-        try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count())) {
-            for (int i = 0; i < docs.count(); i++) {
+        try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count() - offset)) {
+            for (int i = offset; i < docs.count(); i++) {
                 read(docs.get(i), builder);
             }
             return builder.build();

+ 3 - 3
server/src/main/java/org/elasticsearch/index/mapper/KeywordScriptBlockDocValuesReader.java

@@ -51,10 +51,10 @@ public class KeywordScriptBlockDocValuesReader extends BlockDocValuesReader {
     }
 
     @Override
-    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException {
+    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
         // Note that we don't pre-sort our output so we can't use bytesRefsFromDocValues
-        try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count())) {
-            for (int i = 0; i < docs.count(); i++) {
+        try (BlockLoader.BytesRefBuilder builder = factory.bytesRefs(docs.count() - offset)) {
+            for (int i = offset; i < docs.count(); i++) {
                 read(docs.get(i), builder);
             }
             return builder.build();

+ 3 - 3
server/src/main/java/org/elasticsearch/index/mapper/LongScriptBlockDocValuesReader.java

@@ -49,10 +49,10 @@ public class LongScriptBlockDocValuesReader extends BlockDocValuesReader {
     }
 
     @Override
-    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs) throws IOException {
+    public BlockLoader.Block read(BlockLoader.BlockFactory factory, BlockLoader.Docs docs, int offset) throws IOException {
         // Note that we don't pre-sort our output so we can't use longsFromDocValues
-        try (BlockLoader.LongBuilder builder = factory.longs(docs.count())) {
-            for (int i = 0; i < docs.count(); i++) {
+        try (BlockLoader.LongBuilder builder = factory.longs(docs.count() - offset)) {
+            for (int i = offset; i < docs.count(); i++) {
                 read(docs.get(i), builder);
             }
             return builder.build();

+ 1 - 1
server/src/test/java/org/elasticsearch/index/mapper/AbstractShapeGeometryFieldMapperTests.java

@@ -125,7 +125,7 @@ public class AbstractShapeGeometryFieldMapperTests extends ESTestCase {
                     for (int j : array) {
                         expected.add(visitor.apply(geometries.get(j + currentIndex)).get());
                     }
-                    try (var block = (TestBlock) loader.reader(leaf).read(TestBlock.factory(leafReader.numDocs()), TestBlock.docs(array))) {
+                    try (var block = (TestBlock) loader.reader(leaf).read(TestBlock.factory(), TestBlock.docs(array), 0)) {
                         for (int i = 0; i < block.size(); i++) {
                             intArrayResults.add(block.get(i));
                         }

+ 1 - 1
server/src/test/java/org/elasticsearch/index/mapper/BlockSourceReaderTests.java

@@ -59,7 +59,7 @@ public class BlockSourceReaderTests extends MapperServiceTestCase {
             StoredFieldLoader.fromSpec(loader.rowStrideStoredFieldSpec()).getLoader(ctx, null),
             loader.rowStrideStoredFieldSpec().requiresSource() ? SourceLoader.FROM_STORED_SOURCE.leaf(ctx.reader(), null) : null
         );
-        BlockLoader.Builder builder = loader.builder(TestBlock.factory(ctx.reader().numDocs()), 1);
+        BlockLoader.Builder builder = loader.builder(TestBlock.factory(), 1);
         storedFields.advanceTo(0);
         reader.read(0, storedFields, builder);
         TestBlock block = (TestBlock) builder.build();

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/BooleanScriptFieldTypeTests.java

@@ -438,7 +438,8 @@ public class BooleanScriptFieldTypeTests extends AbstractNonTextScriptFieldTypeT
             try (DirectoryReader reader = iw.getReader()) {
                 BooleanScriptFieldType fieldType = build("xor_param", Map.of("param", false), OnScriptError.FAIL);
                 List<Boolean> expected = List.of(false, true);
-                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(expected));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(expected));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(expected.subList(1, 2)));
                 assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(expected));
             }
         }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/DateScriptFieldTypeTests.java

@@ -493,9 +493,10 @@ public class DateScriptFieldTypeTests extends AbstractNonTextScriptFieldTypeTest
             try (DirectoryReader reader = iw.getReader()) {
                 DateScriptFieldType fieldType = build("add_days", Map.of("days", 1), OnScriptError.FAIL);
                 assertThat(
-                    blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType),
+                    blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0),
                     equalTo(List.of(1595518581354L, 1595518581355L))
                 );
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(1595518581355L)));
                 assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(List.of(1595518581354L, 1595518581355L)));
             }
         }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/DoubleScriptFieldTypeTests.java

@@ -251,7 +251,8 @@ public class DoubleScriptFieldTypeTests extends AbstractNonTextScriptFieldTypeTe
             );
             try (DirectoryReader reader = iw.getReader()) {
                 DoubleScriptFieldType fieldType = build("add_param", Map.of("param", 1), OnScriptError.FAIL);
-                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(List.of(2d, 3d)));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(List.of(2d, 3d)));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(3d)));
                 assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(List.of(2d, 3d)));
             }
         }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/IpScriptFieldTypeTests.java

@@ -271,7 +271,8 @@ public class IpScriptFieldTypeTests extends AbstractScriptFieldTypeTestCase {
                     new BytesRef(InetAddressPoint.encode(InetAddresses.forString("192.168.0.1"))),
                     new BytesRef(InetAddressPoint.encode(InetAddresses.forString("192.168.1.1")))
                 );
-                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(expected));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(expected));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(expected.subList(1, 2)));
                 assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(expected));
             }
         }

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/KeywordScriptFieldTypeTests.java

@@ -398,9 +398,10 @@ public class KeywordScriptFieldTypeTests extends AbstractScriptFieldTypeTestCase
             try (DirectoryReader reader = iw.getReader()) {
                 KeywordScriptFieldType fieldType = build("append_param", Map.of("param", "-Suffix"), OnScriptError.FAIL);
                 assertThat(
-                    blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType),
+                    blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0),
                     equalTo(List.of(new BytesRef("1-Suffix"), new BytesRef("2-Suffix")))
                 );
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(new BytesRef("2-Suffix"))));
                 assertThat(
                     blockLoaderReadValuesFromRowStrideReader(reader, fieldType),
                     equalTo(List.of(new BytesRef("1-Suffix"), new BytesRef("2-Suffix")))

+ 2 - 1
server/src/test/java/org/elasticsearch/index/mapper/LongScriptFieldTypeTests.java

@@ -284,7 +284,8 @@ public class LongScriptFieldTypeTests extends AbstractNonTextScriptFieldTypeTest
             );
             try (DirectoryReader reader = iw.getReader()) {
                 LongScriptFieldType fieldType = build("add_param", Map.of("param", 1), OnScriptError.FAIL);
-                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType), equalTo(List.of(2L, 3L)));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 0), equalTo(List.of(2L, 3L)));
+                assertThat(blockLoaderReadValuesFromColumnAtATimeReader(reader, fieldType, 1), equalTo(List.of(3L)));
                 assertThat(blockLoaderReadValuesFromRowStrideReader(reader, fieldType), equalTo(List.of(2L, 3L)));
             }
         }

+ 1 - 0
test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/Clusters.java

@@ -22,6 +22,7 @@ public class Clusters {
             .setting("xpack.security.enabled", "false")
             .setting("xpack.license.self_generated.type", "trial")
             .setting("esql.query.allow_partial_results", "false")
+            .setting("logger.org.elasticsearch.compute.lucene.read", "DEBUG")
             .jvmArg("-Xmx512m");
         String javaVersion = JvmInfo.jvmInfo().version();
         if (javaVersion.equals("20") || javaVersion.equals("21")) {

+ 87 - 4
test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

@@ -568,7 +568,7 @@ public class HeapAttackIT extends ESRestTestCase {
     }
 
     public void testFetchManyBigFields() throws IOException {
-        initManyBigFieldsIndex(100);
+        initManyBigFieldsIndex(100, "keyword");
         Map<?, ?> response = fetchManyBigFields(100);
         ListMatcher columns = matchesList();
         for (int f = 0; f < 1000; f++) {
@@ -578,7 +578,7 @@ public class HeapAttackIT extends ESRestTestCase {
     }
 
     public void testFetchTooManyBigFields() throws IOException {
-        initManyBigFieldsIndex(500);
+        initManyBigFieldsIndex(500, "keyword");
         // 500 docs is plenty to circuit break on most nodes
         assertCircuitBreaks(attempt -> fetchManyBigFields(attempt * 500));
     }
@@ -592,6 +592,58 @@ public class HeapAttackIT extends ESRestTestCase {
         return responseAsMap(query(query.toString(), "columns"));
     }
 
+    public void testAggManyBigTextFields() throws IOException {
+        int docs = 100;
+        int fields = 100;
+        initManyBigFieldsIndex(docs, "text");
+        Map<?, ?> response = aggManyBigFields(fields);
+        ListMatcher columns = matchesList().item(matchesMap().entry("name", "sum").entry("type", "long"));
+        assertMap(
+            response,
+            matchesMap().entry("columns", columns).entry("values", matchesList().item(matchesList().item(1024 * fields * docs)))
+        );
+    }
+
+    /**
+     * Aggregates documents containing many fields which are {@code 1kb} each.
+     */
+    private Map<String, Object> aggManyBigFields(int fields) throws IOException {
+        StringBuilder query = startQuery();
+        query.append("FROM manybigfields | STATS sum = SUM(");
+        query.append("LENGTH(f").append(String.format(Locale.ROOT, "%03d", 0)).append(")");
+        for (int f = 1; f < fields; f++) {
+            query.append(" + LENGTH(f").append(String.format(Locale.ROOT, "%03d", f)).append(")");
+        }
+        query.append(")\"}");
+        return responseAsMap(query(query.toString(), "columns,values"));
+    }
+
+    /**
+     * Aggregates on the {@code LENGTH} of a giant text field. Without
+     * splitting pages on load (#131053) this throws a {@link CircuitBreakingException}
+     * when it tries to load a giant field. With that change it finishes
+     * after loading many single-row pages.
+     */
+    public void testAggGiantTextField() throws IOException {
+        int docs = 100;
+        initGiantTextField(docs);
+        Map<?, ?> response = aggGiantTextField();
+        ListMatcher columns = matchesList().item(matchesMap().entry("name", "sum").entry("type", "long"));
+        assertMap(
+            response,
+            matchesMap().entry("columns", columns).entry("values", matchesList().item(matchesList().item(1024 * 1024 * 5 * docs)))
+        );
+    }
+
+    /**
+     * Aggregates documents containing a text field that is {@code 1mb} each.
+     */
+    private Map<String, Object> aggGiantTextField() throws IOException {
+        StringBuilder query = startQuery();
+        query.append("FROM bigtext | STATS sum = SUM(LENGTH(f))\"}");
+        return responseAsMap(query(query.toString(), "columns,values"));
+    }
+
     public void testAggMvLongs() throws IOException {
         int fieldValues = 100;
         initMvLongsIndex(1, 3, fieldValues);
@@ -786,7 +838,7 @@ public class HeapAttackIT extends ESRestTestCase {
             """);
     }
 
-    private void initManyBigFieldsIndex(int docs) throws IOException {
+    private void initManyBigFieldsIndex(int docs, String type) throws IOException {
         logger.info("loading many documents with many big fields");
         int docsPerBulk = 5;
         int fields = 1000;
@@ -797,7 +849,7 @@ public class HeapAttackIT extends ESRestTestCase {
         config.startObject("settings").field("index.mapping.total_fields.limit", 10000).endObject();
         config.startObject("mappings").startObject("properties");
         for (int f = 0; f < fields; f++) {
-            config.startObject("f" + String.format(Locale.ROOT, "%03d", f)).field("type", "keyword").endObject();
+            config.startObject("f" + String.format(Locale.ROOT, "%03d", f)).field("type", type).endObject();
         }
         config.endObject().endObject();
         request.setJsonEntity(Strings.toString(config.endObject()));
@@ -829,6 +881,37 @@ public class HeapAttackIT extends ESRestTestCase {
         initIndex("manybigfields", bulk.toString());
     }
 
+    private void initGiantTextField(int docs) throws IOException {
+        logger.info("loading many documents with one big text field");
+        int docsPerBulk = 3;
+        int fieldSize = Math.toIntExact(ByteSizeValue.ofMb(5).getBytes());
+
+        Request request = new Request("PUT", "/bigtext");
+        XContentBuilder config = JsonXContent.contentBuilder().startObject();
+        config.startObject("mappings").startObject("properties");
+        config.startObject("f").field("type", "text").endObject();
+        config.endObject().endObject();
+        request.setJsonEntity(Strings.toString(config.endObject()));
+        Response response = client().performRequest(request);
+        assertThat(
+            EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8),
+            equalTo("{\"acknowledged\":true,\"shards_acknowledged\":true,\"index\":\"bigtext\"}")
+        );
+
+        StringBuilder bulk = new StringBuilder();
+        for (int d = 0; d < docs; d++) {
+            bulk.append("{\"create\":{}}\n");
+            bulk.append("{\"f\":\"");
+            bulk.append(Integer.toString(d % 10).repeat(fieldSize));
+            bulk.append("\"}\n");
+            if (d % docsPerBulk == docsPerBulk - 1 && d != docs - 1) {
+                bulk("bigtext", bulk.toString());
+                bulk.setLength(0);
+            }
+        }
+        initIndex("bigtext", bulk.toString());
+    }
+
     private void initMvLongsIndex(int docs, int fields, int fieldValues) throws IOException {
         logger.info("loading documents with many multivalued longs");
         int docsPerBulk = 100;

+ 3 - 4
test/framework/src/main/java/org/elasticsearch/index/mapper/AbstractScriptFieldTypeTestCase.java

@@ -420,13 +420,12 @@ public abstract class AbstractScriptFieldTypeTestCase extends MapperServiceTestC
         }
     }
 
-    protected final List<Object> blockLoaderReadValuesFromColumnAtATimeReader(DirectoryReader reader, MappedFieldType fieldType)
+    protected final List<Object> blockLoaderReadValuesFromColumnAtATimeReader(DirectoryReader reader, MappedFieldType fieldType, int offset)
         throws IOException {
         BlockLoader loader = fieldType.blockLoader(blContext());
         List<Object> all = new ArrayList<>();
         for (LeafReaderContext ctx : reader.leaves()) {
-            TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx)
-                .read(TestBlock.factory(ctx.reader().numDocs()), TestBlock.docs(ctx));
+            TestBlock block = (TestBlock) loader.columnAtATimeReader(ctx).read(TestBlock.factory(), TestBlock.docs(ctx), offset);
             for (int i = 0; i < block.size(); i++) {
                 all.add(block.get(i));
             }
@@ -440,7 +439,7 @@ public abstract class AbstractScriptFieldTypeTestCase extends MapperServiceTestC
         List<Object> all = new ArrayList<>();
         for (LeafReaderContext ctx : reader.leaves()) {
             BlockLoader.RowStrideReader blockReader = loader.rowStrideReader(ctx);
-            BlockLoader.Builder builder = loader.builder(TestBlock.factory(ctx.reader().numDocs()), ctx.reader().numDocs());
+            BlockLoader.Builder builder = loader.builder(TestBlock.factory(), ctx.reader().numDocs());
             for (int i = 0; i < ctx.reader().numDocs(); i++) {
                 blockReader.read(i, null, builder);
             }

+ 36 - 7
test/framework/src/main/java/org/elasticsearch/index/mapper/BlockLoaderTestRunner.java

@@ -36,6 +36,8 @@ import java.util.Set;
 import static org.apache.lucene.tests.util.LuceneTestCase.newDirectory;
 import static org.apache.lucene.tests.util.LuceneTestCase.random;
 import static org.elasticsearch.index.mapper.BlockLoaderTestRunner.PrettyEqual.prettyEqualTo;
+import static org.elasticsearch.test.ESTestCase.between;
+import static org.elasticsearch.test.ESTestCase.randomBoolean;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -69,7 +71,11 @@ public class BlockLoaderTestRunner {
             );
             LuceneDocument doc = mapperService.documentMapper().parse(source).rootDoc();
 
-            iw.addDocument(doc);
+            /*
+             * Add three documents with doc id 0, 1, 2. The real document is 1.
+             * The other two are empty documents.
+             */
+            iw.addDocuments(List.of(List.of(), doc, List.of()));
             iw.close();
 
             try (DirectoryReader reader = DirectoryReader.open(directory)) {
@@ -83,9 +89,32 @@ public class BlockLoaderTestRunner {
         // `columnAtATimeReader` is tried first, we mimic `ValuesSourceReaderOperator`
         var columnAtATimeReader = blockLoader.columnAtATimeReader(context);
         if (columnAtATimeReader != null) {
-            BlockLoader.Docs docs = TestBlock.docs(0);
-            var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(context.reader().numDocs()), docs);
-            assertThat(block.size(), equalTo(1));
+            int[] docArray;
+            int offset;
+            if (randomBoolean()) {
+                // Half the time we load a single document. Nice and simple.
+                docArray = new int[] { 1 };
+                offset = 0;
+            } else {
+                /*
+                 * The other half the time we emulate loading a larger page,
+                 * starting part way through the page.
+                 */
+                docArray = new int[between(2, 10)];
+                offset = between(0, docArray.length - 1);
+                for (int i = 0; i < docArray.length; i++) {
+                    if (i < offset) {
+                        docArray[i] = 0;
+                    } else if (i == offset) {
+                        docArray[i] = 1;
+                    } else {
+                        docArray[i] = 2;
+                    }
+                }
+            }
+            BlockLoader.Docs docs = TestBlock.docs(docArray);
+            var block = (TestBlock) columnAtATimeReader.read(TestBlock.factory(), docs, offset);
+            assertThat(block.size(), equalTo(docArray.length - offset));
             return block.get(0);
         }
 
@@ -102,10 +131,10 @@ public class BlockLoaderTestRunner {
             StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(context, null),
             leafSourceLoader
         );
-        storedFieldsLoader.advanceTo(0);
+        storedFieldsLoader.advanceTo(1);
 
-        BlockLoader.Builder builder = blockLoader.builder(TestBlock.factory(context.reader().numDocs()), 1);
-        blockLoader.rowStrideReader(context).read(0, storedFieldsLoader, builder);
+        BlockLoader.Builder builder = blockLoader.builder(TestBlock.factory(), 1);
+        blockLoader.rowStrideReader(context).read(1, storedFieldsLoader, builder);
         var block = (TestBlock) builder.build();
         assertThat(block.size(), equalTo(1));
 

+ 80 - 15
test/framework/src/main/java/org/elasticsearch/index/mapper/TestBlock.java

@@ -12,6 +12,7 @@ package org.elasticsearch.index.mapper;
 import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.SortedDocValues;
 import org.apache.lucene.util.BytesRef;
+import org.hamcrest.Matcher;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -19,11 +20,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
+import static org.elasticsearch.test.ESTestCase.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 public class TestBlock implements BlockLoader.Block {
-    public static BlockLoader.BlockFactory factory(int pageSize) {
+    public static BlockLoader.BlockFactory factory() {
         return new BlockLoader.BlockFactory() {
             @Override
             public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
@@ -33,6 +37,10 @@ public class TestBlock implements BlockLoader.Block {
             @Override
             public BlockLoader.BooleanBuilder booleans(int expectedCount) {
                 class BooleansBuilder extends TestBlock.Builder implements BlockLoader.BooleanBuilder {
+                    private BooleansBuilder() {
+                        super(expectedCount);
+                    }
+
                     @Override
                     public BooleansBuilder appendBoolean(boolean value) {
                         add(value);
@@ -44,12 +52,27 @@ public class TestBlock implements BlockLoader.Block {
 
             @Override
             public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) {
-                return bytesRefs(expectedCount);
+                class BytesRefsFromDocValuesBuilder extends TestBlock.Builder implements BlockLoader.BytesRefBuilder {
+                    private BytesRefsFromDocValuesBuilder() {
+                        super(expectedCount);
+                    }
+
+                    @Override
+                    public BytesRefsFromDocValuesBuilder appendBytesRef(BytesRef value) {
+                        add(BytesRef.deepCopyOf(value));
+                        return this;
+                    }
+                }
+                return new BytesRefsFromDocValuesBuilder();
             }
 
             @Override
             public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) {
                 class BytesRefsBuilder extends TestBlock.Builder implements BlockLoader.BytesRefBuilder {
+                    private BytesRefsBuilder() {
+                        super(expectedCount);
+                    }
+
                     @Override
                     public BytesRefsBuilder appendBytesRef(BytesRef value) {
                         add(BytesRef.deepCopyOf(value));
@@ -67,6 +90,10 @@ public class TestBlock implements BlockLoader.Block {
             @Override
             public BlockLoader.DoubleBuilder doubles(int expectedCount) {
                 class DoublesBuilder extends TestBlock.Builder implements BlockLoader.DoubleBuilder {
+                    private DoublesBuilder() {
+                        super(expectedCount);
+                    }
+
                     @Override
                     public DoublesBuilder appendDouble(double value) {
                         add(value);
@@ -81,6 +108,10 @@ public class TestBlock implements BlockLoader.Block {
                 class FloatsBuilder extends TestBlock.Builder implements BlockLoader.FloatBuilder {
                     int numElements = 0;
 
+                    private FloatsBuilder() {
+                        super(expectedCount);
+                    }
+
                     @Override
                     public BlockLoader.FloatBuilder appendFloat(float value) {
                         add(value);
@@ -117,6 +148,10 @@ public class TestBlock implements BlockLoader.Block {
             @Override
             public BlockLoader.IntBuilder ints(int expectedCount) {
                 class IntsBuilder extends TestBlock.Builder implements BlockLoader.IntBuilder {
+                    private IntsBuilder() {
+                        super(expectedCount);
+                    }
+
                     @Override
                     public IntsBuilder appendInt(int value) {
                         add(value);
@@ -134,6 +169,10 @@ public class TestBlock implements BlockLoader.Block {
             @Override
             public BlockLoader.LongBuilder longs(int expectedCount) {
                 class LongsBuilder extends TestBlock.Builder implements BlockLoader.LongBuilder {
+                    private LongsBuilder() {
+                        super(expectedCount);
+                    }
+
                     @Override
                     public LongsBuilder appendLong(long value) {
                         add(value);
@@ -149,26 +188,30 @@ public class TestBlock implements BlockLoader.Block {
             }
 
             @Override
-            public BlockLoader.Block constantNulls() {
-                BlockLoader.LongBuilder builder = longs(pageSize);
-                for (int i = 0; i < pageSize; i++) {
+            public BlockLoader.Block constantNulls(int count) {
+                BlockLoader.LongBuilder builder = longs(count);
+                for (int i = 0; i < count; i++) {
                     builder.appendNull();
                 }
                 return builder.build();
             }
 
             @Override
-            public BlockLoader.Block constantBytes(BytesRef value) {
-                BlockLoader.BytesRefBuilder builder = bytesRefs(pageSize);
-                for (int i = 0; i < pageSize; i++) {
+            public BlockLoader.Block constantBytes(BytesRef value, int count) {
+                BlockLoader.BytesRefBuilder builder = bytesRefs(count);
+                for (int i = 0; i < count; i++) {
                     builder.appendBytesRef(value);
                 }
                 return builder.build();
             }
 
             @Override
-            public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
+            public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int expectedCount) {
                 class SingletonOrdsBuilder extends TestBlock.Builder implements BlockLoader.SingletonOrdinalsBuilder {
+                    private SingletonOrdsBuilder() {
+                        super(expectedCount);
+                    }
+
                     @Override
                     public SingletonOrdsBuilder appendOrd(int value) {
                         try {
@@ -183,8 +226,8 @@ public class TestBlock implements BlockLoader.Block {
             }
 
             @Override
-            public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
-                return new AggregateMetricDoubleBlockBuilder();
+            public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int expectedSize) {
+                return new AggregateMetricDoubleBlockBuilder(expectedSize);
             }
         };
     }
@@ -239,8 +282,14 @@ public class TestBlock implements BlockLoader.Block {
     private abstract static class Builder implements BlockLoader.Builder {
         private final List<Object> values = new ArrayList<>();
 
+        private Matcher<Integer> expectedSize;
+
         private List<Object> currentPosition = null;
 
+        private Builder(int expectedSize) {
+            this.expectedSize = equalTo(expectedSize);
+        }
+
         @Override
         public Builder appendNull() {
             assertNull(currentPosition);
@@ -269,6 +318,7 @@ public class TestBlock implements BlockLoader.Block {
 
         @Override
         public TestBlock build() {
+            assertThat(values, hasSize(expectedSize));
             return new TestBlock(values);
         }
 
@@ -283,12 +333,23 @@ public class TestBlock implements BlockLoader.Block {
      * The implementation here is fairly close to the production one.
      */
     private static class AggregateMetricDoubleBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {
-        private final DoubleBuilder min = new DoubleBuilder();
-        private final DoubleBuilder max = new DoubleBuilder();
-        private final DoubleBuilder sum = new DoubleBuilder();
-        private final IntBuilder count = new IntBuilder();
+        private final DoubleBuilder min;
+        private final DoubleBuilder max;
+        private final DoubleBuilder sum;
+        private final IntBuilder count;
+
+        private AggregateMetricDoubleBlockBuilder(int expectedSize) {
+            min = new DoubleBuilder(expectedSize);
+            max = new DoubleBuilder(expectedSize);
+            sum = new DoubleBuilder(expectedSize);
+            count = new IntBuilder(expectedSize);
+        }
 
         private static class DoubleBuilder extends TestBlock.Builder implements BlockLoader.DoubleBuilder {
+            private DoubleBuilder(int expectedSize) {
+                super(expectedSize);
+            }
+
             @Override
             public BlockLoader.DoubleBuilder appendDouble(double value) {
                 add(value);
@@ -297,6 +358,10 @@ public class TestBlock implements BlockLoader.Block {
         }
 
         private static class IntBuilder extends TestBlock.Builder implements BlockLoader.IntBuilder {
+            private IntBuilder(int expectedSize) {
+                super(expectedSize);
+            }
+
             @Override
             public BlockLoader.IntBuilder appendInt(int value) {
                 add(value);

+ 0 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocVector.java

@@ -107,7 +107,6 @@ public final class DocVector extends AbstractVector implements Vector {
             prev = v;
         }
         return true;
-
     }
 
     /**

+ 5 - 7
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java

@@ -14,18 +14,16 @@ import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.core.Releasable;
 
 class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
-    private final int pageSize;
     private Block nullBlock;
 
-    ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
+    ComputeBlockLoaderFactory(BlockFactory factory) {
         super(factory);
-        this.pageSize = pageSize;
     }
 
     @Override
-    public Block constantNulls() {
+    public Block constantNulls(int count) {
         if (nullBlock == null) {
-            nullBlock = factory.newConstantNullBlock(pageSize);
+            nullBlock = factory.newConstantNullBlock(count);
         }
         nullBlock.incRef();
         return nullBlock;
@@ -39,7 +37,7 @@ class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements
     }
 
     @Override
-    public BytesRefBlock constantBytes(BytesRef value) {
-        return factory.newConstantBytesRefBlockWith(value, pageSize);
+    public BytesRefBlock constantBytes(BytesRef value, int count) {
+        return factory.newConstantBytesRefBlockWith(value, count);
     }
 }

+ 33 - 4
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java

@@ -16,6 +16,8 @@ import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
 import org.elasticsearch.index.mapper.BlockLoader;
 import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
 import org.elasticsearch.index.mapper.SourceLoader;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
 import org.elasticsearch.search.fetch.StoredFieldsSpec;
 
 import java.io.IOException;
@@ -24,6 +26,8 @@ import java.io.IOException;
  * Loads values from a many leaves. Much less efficient than {@link ValuesFromSingleReader}.
  */
 class ValuesFromManyReader extends ValuesReader {
+    private static final Logger log = LogManager.getLogger(ValuesFromManyReader.class);
+
     private final int[] forwards;
     private final int[] backwards;
     private final BlockLoader.RowStrideReader[] rowStride;
@@ -35,6 +39,7 @@ class ValuesFromManyReader extends ValuesReader {
         forwards = docs.shardSegmentDocMapForwards();
         backwards = docs.shardSegmentDocMapBackwards();
         rowStride = new BlockLoader.RowStrideReader[operator.fields.length];
+        log.debug("initializing {} positions", docs.getPositionCount());
     }
 
     @Override
@@ -70,9 +75,7 @@ class ValuesFromManyReader extends ValuesReader {
                 builders[f] = new Block.Builder[operator.shardContexts.size()];
                 converters[f] = new BlockLoader[operator.shardContexts.size()];
             }
-            try (
-                ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.getPositionCount())
-            ) {
+            try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory)) {
                 int p = forwards[offset];
                 int shard = docs.shards().getInt(p);
                 int segment = docs.segments().getInt(p);
@@ -84,7 +87,9 @@ class ValuesFromManyReader extends ValuesReader {
                 read(firstDoc, shard);
 
                 int i = offset + 1;
-                while (i < forwards.length) {
+                long estimated = estimatedRamBytesUsed();
+                long dangerZoneBytes = Long.MAX_VALUE; // TODO danger_zone if ascending
+                while (i < forwards.length && estimated < dangerZoneBytes) {
                     p = forwards[i];
                     shard = docs.shards().getInt(p);
                     segment = docs.segments().getInt(p);
@@ -96,8 +101,17 @@ class ValuesFromManyReader extends ValuesReader {
                     verifyBuilders(loaderBlockFactory, shard);
                     read(docs.docs().getInt(p), shard);
                     i++;
+                    estimated = estimatedRamBytesUsed();
+                    log.trace("{}: bytes loaded {}/{}", p, estimated, dangerZoneBytes);
                 }
                 buildBlocks();
+                if (log.isDebugEnabled()) {
+                    long actual = 0;
+                    for (Block b : target) {
+                        actual += b.ramBytesUsed();
+                    }
+                    log.debug("loaded {} positions total estimated/actual {}/{} bytes", p, estimated, actual);
+                }
             }
         }
 
@@ -115,6 +129,9 @@ class ValuesFromManyReader extends ValuesReader {
                 }
                 operator.sanityCheckBlock(rowStride[f], backwards.length, target[f], f);
             }
+            if (target[0].getPositionCount() != docs.getPositionCount()) {
+                throw new IllegalStateException("partial pages not yet supported");
+            }
         }
 
         private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) {
@@ -141,6 +158,18 @@ class ValuesFromManyReader extends ValuesReader {
                 Releasables.closeExpectNoException(builders[f]);
             }
         }
+
+        private long estimatedRamBytesUsed() {
+            long estimated = 0;
+            for (Block.Builder[] builders : this.builders) {
+                for (Block.Builder builder : builders) {
+                    if (builder != null) {
+                        estimated += builder.estimatedBytes();
+                    }
+                }
+            }
+            return estimated;
+        }
     }
 
     private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException {

+ 76 - 37
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java

@@ -16,6 +16,8 @@ import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
 import org.elasticsearch.index.mapper.BlockLoader;
 import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
 import org.elasticsearch.index.mapper.SourceLoader;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
 import org.elasticsearch.search.fetch.StoredFieldsSpec;
 
 import java.io.IOException;
@@ -26,6 +28,8 @@ import java.util.List;
  * Loads values from a single leaf. Much more efficient than {@link ValuesFromManyReader}.
  */
 class ValuesFromSingleReader extends ValuesReader {
+    private static final Logger log = LogManager.getLogger(ValuesFromSingleReader.class);
+
     /**
      * Minimum number of documents for which it is more efficient to use a
      * sequential stored field reader when reading stored fields.
@@ -45,39 +49,27 @@ class ValuesFromSingleReader extends ValuesReader {
         super(operator, docs);
         this.shard = docs.shards().getInt(0);
         this.segment = docs.segments().getInt(0);
+        log.debug("initialized {} positions", docs.getPositionCount());
     }
 
     @Override
     protected void load(Block[] target, int offset) throws IOException {
-        assert offset == 0; // TODO allow non-0 offset to support splitting pages
         if (docs.singleSegmentNonDecreasing()) {
-            loadFromSingleLeaf(target, new BlockLoader.Docs() {
-                @Override
-                public int count() {
-                    return docs.getPositionCount();
-                }
-
-                @Override
-                public int get(int i) {
-                    return docs.docs().getInt(i);
-                }
-            });
+            loadFromSingleLeaf(operator.jumboBytes, target, new ValuesReaderDocs(docs), offset);
             return;
         }
+        if (offset != 0) {
+            throw new IllegalStateException("can only load partial pages with single-segment non-decreasing pages");
+        }
         int[] forwards = docs.shardSegmentDocMapForwards();
         Block[] unshuffled = new Block[target.length];
         try {
-            loadFromSingleLeaf(unshuffled, new BlockLoader.Docs() {
-                @Override
-                public int count() {
-                    return docs.getPositionCount();
-                }
-
-                @Override
-                public int get(int i) {
-                    return docs.docs().getInt(forwards[i]);
-                }
-            });
+            loadFromSingleLeaf(
+                Long.MAX_VALUE, // Effectively disable splitting pages when we're not loading in order
+                unshuffled,
+                new ValuesReaderDocs(docs).mapped(forwards),
+                0
+            );
             final int[] backwards = docs.shardSegmentDocMapBackwards();
             for (int i = 0; i < unshuffled.length; i++) {
                 target[i] = unshuffled[i].filter(backwards);
@@ -89,24 +81,25 @@ class ValuesFromSingleReader extends ValuesReader {
         }
     }
 
-    private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IOException {
-        int firstDoc = docs.get(0);
+    private void loadFromSingleLeaf(long jumboBytes, Block[] target, ValuesReaderDocs docs, int offset) throws IOException {
+        int firstDoc = docs.get(offset);
         operator.positionFieldWork(shard, segment, firstDoc);
         StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
-        List<RowStrideReaderWork> rowStrideReaders = new ArrayList<>(operator.fields.length);
         LeafReaderContext ctx = operator.ctx(shard, segment);
-        try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.count())) {
+
+        List<ColumnAtATimeWork> columnAtATimeReaders = new ArrayList<>(operator.fields.length);
+        List<RowStrideReaderWork> rowStrideReaders = new ArrayList<>(operator.fields.length);
+        try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory)) {
             for (int f = 0; f < operator.fields.length; f++) {
                 ValuesSourceReaderOperator.FieldWork field = operator.fields[f];
                 BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx);
                 if (columnAtATime != null) {
-                    target[f] = (Block) columnAtATime.read(loaderBlockFactory, docs);
-                    operator.sanityCheckBlock(columnAtATime, docs.count(), target[f], f);
+                    columnAtATimeReaders.add(new ColumnAtATimeWork(columnAtATime, f));
                 } else {
                     rowStrideReaders.add(
                         new RowStrideReaderWork(
                             field.rowStride(ctx),
-                            (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()),
+                            (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count() - offset),
                             field.loader,
                             f
                         )
@@ -116,7 +109,18 @@ class ValuesFromSingleReader extends ValuesReader {
             }
 
             if (rowStrideReaders.isEmpty() == false) {
-                loadFromRowStrideReaders(target, storedFieldsSpec, rowStrideReaders, ctx, docs);
+                loadFromRowStrideReaders(jumboBytes, target, storedFieldsSpec, rowStrideReaders, ctx, docs, offset);
+            }
+            for (ColumnAtATimeWork r : columnAtATimeReaders) {
+                target[r.idx] = (Block) r.reader.read(loaderBlockFactory, docs, offset);
+                operator.sanityCheckBlock(r.reader, docs.count() - offset, target[r.idx], r.idx);
+            }
+            if (log.isDebugEnabled()) {
+                long total = 0;
+                for (Block b : target) {
+                    total += b.ramBytesUsed();
+                }
+                log.debug("loaded {} positions total ({} bytes)", target[0].getPositionCount(), total);
             }
         } finally {
             Releasables.close(rowStrideReaders);
@@ -124,11 +128,13 @@ class ValuesFromSingleReader extends ValuesReader {
     }
 
     private void loadFromRowStrideReaders(
+        long jumboBytes,
         Block[] target,
         StoredFieldsSpec storedFieldsSpec,
         List<RowStrideReaderWork> rowStrideReaders,
         LeafReaderContext ctx,
-        BlockLoader.Docs docs
+        ValuesReaderDocs docs,
+        int offset
     ) throws IOException {
         SourceLoader sourceLoader = null;
         ValuesSourceReaderOperator.ShardContext shardContext = operator.shardContexts.get(shard);
@@ -153,18 +159,29 @@ class ValuesFromSingleReader extends ValuesReader {
             storedFieldLoader.getLoader(ctx, null),
             sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
         );
-        int p = 0;
-        while (p < docs.count()) {
+        int p = offset;
+        long estimated = 0;
+        while (p < docs.count() && estimated < jumboBytes) {
             int doc = docs.get(p++);
             storedFields.advanceTo(doc);
             for (RowStrideReaderWork work : rowStrideReaders) {
                 work.read(doc, storedFields);
             }
+            estimated = estimatedRamBytesUsed(rowStrideReaders);
+            log.trace("{}: bytes loaded {}/{}", p, estimated, jumboBytes);
         }
         for (RowStrideReaderWork work : rowStrideReaders) {
-            target[work.offset] = work.build();
-            operator.sanityCheckBlock(work.reader, p, target[work.offset], work.offset);
+            target[work.idx] = work.build();
+            operator.sanityCheckBlock(work.reader, p - offset, target[work.idx], work.idx);
         }
+        if (log.isDebugEnabled()) {
+            long actual = 0;
+            for (RowStrideReaderWork work : rowStrideReaders) {
+                actual += target[work.idx].ramBytesUsed();
+            }
+            log.debug("loaded {} positions row stride estimated/actual {}/{} bytes", p - offset, estimated, actual);
+        }
+        docs.setCount(p);
     }
 
     /**
@@ -180,7 +197,21 @@ class ValuesFromSingleReader extends ValuesReader {
         return range * storedFieldsSequentialProportion <= count;
     }
 
-    private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset)
+    /**
+     * Work for building a column-at-a-time.
+     * @param reader reads the values
+     * @param idx destination in array of {@linkplain Block}s we build
+     */
+    private record ColumnAtATimeWork(BlockLoader.ColumnAtATimeReader reader, int idx) {}
+
+    /**
+     * Work for
+     * @param reader
+     * @param builder
+     * @param loader
+     * @param idx
+     */
+    private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int idx)
         implements
             Releasable {
         void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException {
@@ -196,4 +227,12 @@ class ValuesFromSingleReader extends ValuesReader {
             builder.close();
         }
     }
+
+    private long estimatedRamBytesUsed(List<RowStrideReaderWork> rowStrideReaders) {
+        long estimated = 0;
+        for (RowStrideReaderWork r : rowStrideReaders) {
+            estimated += r.builder.estimatedBytes();
+        }
+        return estimated;
+    }
 }

+ 0 - 3
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java

@@ -36,9 +36,6 @@ public abstract class ValuesReader implements ReleasableIterator<Block[]> {
         boolean success = false;
         try {
             load(target, offset);
-            if (target[0].getPositionCount() != docs.getPositionCount()) {
-                throw new IllegalStateException("partial pages not yet supported");
-            }
             success = true;
             for (Block b : target) {
                 operator.valuesLoaded += b.getTotalValueCount();

+ 69 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReaderDocs.java

@@ -0,0 +1,69 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.elasticsearch.compute.data.DocVector;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.index.mapper.BlockLoader;
+
+/**
+ * Implementation of {@link BlockLoader.Docs} for ESQL. It's important that
+ * only this implementation, and the implementation returned by {@link #mapped}
+ * exist. This allows the jvm to inline the {@code invokevirtual}s to call
+ * the interface in hot, hot code.
+ * <p>
+ *     We've investigated moving the {@code offset} parameter from the
+ *     {@link BlockLoader.ColumnAtATimeReader#read} into this. That's more
+ *     readable, but a clock cycle slower.
+ * </p>
+ * <p>
+ *     When we tried having a {@link Nullable} map member instead of a subclass
+ *     that was also slower.
+ * </p>
+ */
+class ValuesReaderDocs implements BlockLoader.Docs {
+    private final DocVector docs;
+    private int count;
+
+    ValuesReaderDocs(DocVector docs) {
+        this.docs = docs;
+        this.count = docs.getPositionCount();
+    }
+
+    final Mapped mapped(int[] forwards) {
+        return new Mapped(docs, forwards);
+    }
+
+    public final void setCount(int count) {
+        this.count = count;
+    }
+
+    @Override
+    public final int count() {
+        return count;
+    }
+
+    @Override
+    public int get(int i) {
+        return docs.docs().getInt(i);
+    }
+
+    private class Mapped extends ValuesReaderDocs {
+        private final int[] forwards;
+
+        private Mapped(DocVector docs, int[] forwards) {
+            super(docs);
+            this.forwards = forwards;
+        }
+
+        @Override
+        public int get(int i) {
+            return super.get(forwards[i]);
+        }
+    }
+}

+ 26 - 5
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java

@@ -9,6 +9,7 @@ package org.elasticsearch.compute.lucene.read;
 
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.DocBlock;
@@ -42,7 +43,9 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOpe
      * @param shardContexts per-shard loading information
      * @param docChannel the channel containing the shard, leaf/segment and doc id
      */
-    public record Factory(List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) implements OperatorFactory {
+    public record Factory(ByteSizeValue jumboSize, List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel)
+        implements
+            OperatorFactory {
         public Factory {
             if (fields.isEmpty()) {
                 throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields");
@@ -51,7 +54,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOpe
 
         @Override
         public Operator get(DriverContext driverContext) {
-            return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel);
+            return new ValuesSourceReaderOperator(driverContext.blockFactory(), jumboSize.getBytes(), fields, shardContexts, docChannel);
         }
 
         @Override
@@ -85,10 +88,21 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOpe
 
     public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
 
+    final BlockFactory blockFactory;
+    /**
+     * When the loaded fields {@link Block}s' estimated size grows larger than this,
+     * we finish loading the {@linkplain Page} and return it, even if
+     * the {@linkplain Page} is shorter than the incoming {@linkplain Page}.
+     * <p>
+     *     NOTE: This only applies when loading single segment non-descending
+     *     row stride bytes. This is the most common way to get giant fields,
+     *     but it isn't all the ways.
+     * </p>
+     */
+    final long jumboBytes;
     final FieldWork[] fields;
     final List<ShardContext> shardContexts;
     private final int docChannel;
-    final BlockFactory blockFactory;
 
     private final Map<String, Integer> readersBuilt = new TreeMap<>();
     long valuesLoaded;
@@ -101,14 +115,21 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOpe
      * @param fields fields to load
      * @param docChannel the channel containing the shard, leaf/segment and doc id
      */
-    public ValuesSourceReaderOperator(BlockFactory blockFactory, List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) {
+    public ValuesSourceReaderOperator(
+        BlockFactory blockFactory,
+        long jumboBytes,
+        List<FieldInfo> fields,
+        List<ShardContext> shardContexts,
+        int docChannel
+    ) {
         if (fields.isEmpty()) {
             throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields");
         }
+        this.blockFactory = blockFactory;
+        this.jumboBytes = jumboBytes;
         this.fields = fields.stream().map(FieldWork::new).toArray(FieldWork[]::new);
         this.shardContexts = shardContexts;
         this.docChannel = docChannel;
-        this.blockFactory = blockFactory;
     }
 
     @Override

+ 1 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

@@ -498,6 +498,7 @@ public class OrdinalsGroupingOperator implements Operator {
         ) {
             this.extractor = new ValuesSourceReaderOperator(
                 driverContext.blockFactory(),
+                Long.MAX_VALUE,
                 List.of(new ValuesSourceReaderOperator.FieldInfo(groupingField, groupingElementType, blockLoaders)),
                 shardContexts,
                 docChannel

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -289,6 +289,7 @@ public class OperatorTests extends MapperServiceTestCase {
                 operators.add(
                     new ValuesSourceReaderOperator(
                         blockFactory,
+                        ByteSizeValue.ofMb(1).getBytes(),
                         List.of(
                             new ValuesSourceReaderOperator.FieldInfo(
                                 VAL_NAME,
@@ -380,6 +381,7 @@ public class OperatorTests extends MapperServiceTestCase {
                 LuceneOperator.NO_LIMIT
             );
             ValuesSourceReaderOperator.Factory load = new ValuesSourceReaderOperator.Factory(
+                ByteSizeValue.ofGb(1),
                 List.of(
                     new ValuesSourceReaderOperator.FieldInfo("v", ElementType.LONG, f -> new BlockDocValuesReader.LongsBlockLoader("v"))
                 ),
@@ -406,7 +408,6 @@ public class OperatorTests extends MapperServiceTestCase {
             boolean sawSecondMax = false;
             boolean sawThirdMax = false;
             for (Page page : pages) {
-                logger.error("ADFA {}", page);
                 LongVector group = page.<LongBlock>getBlock(1).asVector();
                 LongVector value = page.<LongBlock>getBlock(2).asVector();
                 for (int p = 0; p < page.getPositionCount(); p++) {

+ 2 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

@@ -23,6 +23,7 @@ import org.apache.lucene.search.TermQuery;
 import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.apache.lucene.tests.store.BaseDirectoryWrapper;
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.compute.OperatorTests;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BytesRefBlock;
@@ -199,6 +200,7 @@ public abstract class LuceneQueryEvaluatorTests<T extends Vector, U extends Vect
             operators.add(
                 new ValuesSourceReaderOperator(
                     blockFactory,
+                    ByteSizeValue.ofGb(1).getBytes(),
                     List.of(
                         new ValuesSourceReaderOperator.FieldInfo(
                             FIELD,

+ 23 - 10
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java

@@ -34,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.compute.data.Block;
@@ -239,12 +240,17 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
         ElementType elementType,
         BlockLoader loader
     ) {
-        return new ValuesSourceReaderOperator.Factory(List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> {
-            if (shardIdx < 0 || shardIdx >= INDICES.size()) {
-                fail("unexpected shardIdx [" + shardIdx + "]");
-            }
-            return loader;
-        })), shardContexts, 0);
+        return new ValuesSourceReaderOperator.Factory(
+            ByteSizeValue.ofGb(1),
+            List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> {
+                if (shardIdx < 0 || shardIdx >= INDICES.size()) {
+                    fail("unexpected shardIdx [" + shardIdx + "]");
+                }
+                return loader;
+            })),
+            shardContexts,
+            0
+        );
     }
 
     protected SourceOperator simpleInput(DriverContext context, int size) {
@@ -491,6 +497,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
         // TODO: Add index2
         operators.add(
             new ValuesSourceReaderOperator.Factory(
+                ByteSizeValue.ofGb(1),
                 List.of(testCase.info, fieldInfo(mapperService(indexKey).fieldType("key"), ElementType.INT)),
                 shardContexts,
                 0
@@ -598,6 +605,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
         List<Operator> operators = new ArrayList<>();
         operators.add(
             new ValuesSourceReaderOperator.Factory(
+                ByteSizeValue.ofGb(1),
                 List.of(
                     fieldInfo(mapperService("index1").fieldType("key"), ElementType.INT),
                     fieldInfo(mapperService("index1").fieldType("indexKey"), ElementType.BYTES_REF)
@@ -612,7 +620,9 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             cases.removeAll(b);
             tests.addAll(b);
             operators.add(
-                new ValuesSourceReaderOperator.Factory(b.stream().map(i -> i.info).toList(), shardContexts, 0).get(driverContext)
+                new ValuesSourceReaderOperator.Factory(ByteSizeValue.ofGb(1), b.stream().map(i -> i.info).toList(), shardContexts, 0).get(
+                    driverContext
+                )
             );
         }
         List<Page> results = drive(operators, input.iterator(), driverContext);
@@ -716,7 +726,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING
         );
         List<Operator> operators = cases.stream()
-            .map(i -> new ValuesSourceReaderOperator.Factory(List.of(i.info), shardContexts, 0).get(driverContext))
+            .map(i -> new ValuesSourceReaderOperator.Factory(ByteSizeValue.ofGb(1), List.of(i.info), shardContexts, 0).get(driverContext))
             .toList();
         if (allInOnePage) {
             input = List.of(CannedSourceOperator.mergePages(input));
@@ -1391,6 +1401,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
                 simpleInput(driverContext, 10),
                 List.of(
                     new ValuesSourceReaderOperator.Factory(
+                        ByteSizeValue.ofGb(1),
                         List.of(
                             new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS),
                             new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS)
@@ -1426,6 +1437,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
         List<FieldCase> cases = infoAndChecksForEachType(ordering, ordering);
 
         ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory(
+            ByteSizeValue.ofGb(1),
             cases.stream().map(c -> c.info).toList(),
             List.of(new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
             0
@@ -1471,6 +1483,7 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             // TODO add index2
             MappedFieldType ft = mapperService(indexKey).fieldType("key");
             var readerFactory = new ValuesSourceReaderOperator.Factory(
+                ByteSizeValue.ofGb(1),
                 List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, shardIdx -> {
                     seenShards.add(shardIdx);
                     return ft.blockLoader(blContext());
@@ -1687,8 +1700,8 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             }
             return new ColumnAtATimeReader() {
                 @Override
-                public Block read(BlockFactory factory, Docs docs) throws IOException {
-                    Block block = reader.read(factory, docs);
+                public Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+                    Block block = reader.read(factory, docs, offset);
                     Page page = new Page((org.elasticsearch.compute.data.Block) block);
                     return convertEvaluator.eval(page);
                 }

+ 67 - 10
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java

@@ -30,6 +30,7 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BooleanBlock;
@@ -37,6 +38,7 @@ import org.elasticsearch.compute.data.BooleanVector;
 import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.compute.data.BytesRefVector;
 import org.elasticsearch.compute.data.DocBlock;
+import org.elasticsearch.compute.data.DocVector;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.DoubleVector;
 import org.elasticsearch.compute.data.ElementType;
@@ -98,6 +100,7 @@ import static org.elasticsearch.test.MapMatcher.assertMap;
 import static org.elasticsearch.test.MapMatcher.matchesMap;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
@@ -149,12 +152,14 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
     }
 
     static Operator.OperatorFactory factory(IndexReader reader, String name, ElementType elementType, BlockLoader loader) {
-        return new ValuesSourceReaderOperator.Factory(List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> {
-            if (shardIdx != 0) {
-                fail("unexpected shardIdx [" + shardIdx + "]");
-            }
-            return loader;
-        })),
+        return new ValuesSourceReaderOperator.Factory(
+            ByteSizeValue.ofGb(1),
+            List.of(new ValuesSourceReaderOperator.FieldInfo(name, elementType, shardIdx -> {
+                if (shardIdx != 0) {
+                    fail("unexpected shardIdx [" + shardIdx + "]");
+                }
+                return loader;
+            })),
             List.of(
                 new ValuesSourceReaderOperator.ShardContext(
                     reader,
@@ -400,7 +405,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             for (int d = 0; d < size; d++) {
                 XContentBuilder source = JsonXContent.contentBuilder();
                 source.startObject();
-                source.field("long_source_text", Integer.toString(d).repeat(100 * 1024));
+                source.field("long_source_text", d + "#" + "a".repeat(100 * 1024));
                 source.endObject();
                 ParsedDocument doc = mapperService.documentParser()
                     .parseDocument(
@@ -488,6 +493,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         );
         operators.add(
             new ValuesSourceReaderOperator.Factory(
+                ByteSizeValue.ofGb(1),
                 List.of(testCase.info, fieldInfo(mapperService.fieldType("key"), ElementType.INT)),
                 List.of(
                     new ValuesSourceReaderOperator.ShardContext(
@@ -607,6 +613,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         List<Operator> operators = new ArrayList<>();
         operators.add(
             new ValuesSourceReaderOperator.Factory(
+                ByteSizeValue.ofGb(1),
                 List.of(fieldInfo(mapperService.fieldType("key"), ElementType.INT)),
                 List.of(
                     new ValuesSourceReaderOperator.ShardContext(
@@ -625,6 +632,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             tests.addAll(b);
             operators.add(
                 new ValuesSourceReaderOperator.Factory(
+                    ByteSizeValue.ofGb(1),
                     b.stream().map(i -> i.info).toList(),
                     List.of(
                         new ValuesSourceReaderOperator.ShardContext(
@@ -723,6 +731,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         List<Operator> operators = cases.stream()
             .map(
                 i -> new ValuesSourceReaderOperator.Factory(
+                    ByteSizeValue.ofGb(1),
                     List.of(i.info),
                     List.of(
                         new ValuesSourceReaderOperator.ShardContext(
@@ -927,7 +936,6 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
     private void testLoadLong(boolean shuffle, boolean manySegments) throws IOException {
         int numDocs = between(10, 500);
         initMapping();
-        keyToTags.clear();
         reader = initIndexLongField(directory, numDocs, manySegments ? commitEvery(numDocs) : numDocs, manySegments == false);
 
         DriverContext driverContext = driverContext();
@@ -940,6 +948,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         if (shuffle) {
             input = input.stream().map(this::shuffle).toList();
         }
+        boolean willSplit = loadLongWillSplit(input);
 
         Checks checks = new Checks(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
 
@@ -955,6 +964,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         List<Operator> operators = cases.stream()
             .map(
                 i -> new ValuesSourceReaderOperator.Factory(
+                    ByteSizeValue.ofGb(1),
                     List.of(i.info),
                     List.of(
                         new ValuesSourceReaderOperator.ShardContext(
@@ -967,12 +977,55 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
                 ).get(driverContext)
             )
             .toList();
-        drive(operators, input.iterator(), driverContext);
+        List<Page> result = drive(operators, input.iterator(), driverContext);
+
+        boolean[] found = new boolean[numDocs];
+        for (Page page : result) {
+            BytesRefVector bytes = page.<BytesRefBlock>getBlock(1).asVector();
+            BytesRef scratch = new BytesRef();
+            for (int p = 0; p < bytes.getPositionCount(); p++) {
+                BytesRef v = bytes.getBytesRef(p, scratch);
+                int d = Integer.valueOf(v.utf8ToString().split("#")[0]);
+                assertFalse("found a duplicate " + d, found[d]);
+                found[d] = true;
+            }
+        }
+        List<Integer> missing = new ArrayList<>();
+        for (int d = 0; d < numDocs; d++) {
+            if (found[d] == false) {
+                missing.add(d);
+            }
+        }
+        assertThat(missing, hasSize(0));
+        assertThat(result, hasSize(willSplit ? greaterThanOrEqualTo(input.size()) : equalTo(input.size())));
+
         for (int i = 0; i < cases.size(); i++) {
             ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status();
             assertThat(status.pagesReceived(), equalTo(input.size()));
-            assertThat(status.pagesEmitted(), equalTo(input.size()));
+            assertThat(status.pagesEmitted(), willSplit ? greaterThanOrEqualTo(input.size()) : equalTo(input.size()));
+        }
+    }
+
+    private boolean loadLongWillSplit(List<Page> input) {
+        int nextDoc = -1;
+        for (Page page : input) {
+            DocVector doc = page.<DocBlock>getBlock(0).asVector();
+            for (int p = 0; p < doc.getPositionCount(); p++) {
+                if (doc.shards().getInt(p) != 0) {
+                    return false;
+                }
+                if (doc.segments().getInt(p) != 0) {
+                    return false;
+                }
+                if (nextDoc == -1) {
+                    nextDoc = doc.docs().getInt(p);
+                } else if (doc.docs().getInt(p) != nextDoc) {
+                    return false;
+                }
+                nextDoc++;
+            }
         }
+        return true;
     }
 
     record Checks(Block.MvOrdering booleanAndNumericalDocValuesMvOrdering, Block.MvOrdering bytesRefDocValuesMvOrdering) {
@@ -1567,6 +1620,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
                 simpleInput(driverContext.blockFactory(), 10),
                 List.of(
                     new ValuesSourceReaderOperator.Factory(
+                        ByteSizeValue.ofGb(1),
                         List.of(
                             new ValuesSourceReaderOperator.FieldInfo("null1", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS),
                             new ValuesSourceReaderOperator.FieldInfo("null2", ElementType.NULL, shardIdx -> BlockLoader.CONSTANT_NULLS)
@@ -1619,6 +1673,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         assertThat(source, hasSize(1)); // We want one page for simpler assertions, and we want them all in one segment
         assertTrue(source.get(0).<DocBlock>getBlock(0).asVector().singleSegmentNonDecreasing());
         Operator op = new ValuesSourceReaderOperator.Factory(
+            ByteSizeValue.ofGb(1),
             List.of(
                 fieldInfo(mapperService.fieldType("key"), ElementType.INT),
                 fieldInfo(storedTextField("stored_text"), ElementType.BYTES_REF)
@@ -1656,6 +1711,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         List<FieldCase> cases = infoAndChecksForEachType(ordering, ordering);
 
         ValuesSourceReaderOperator.Factory factory = new ValuesSourceReaderOperator.Factory(
+            ByteSizeValue.ofGb(1),
             cases.stream().map(c -> c.info).toList(),
             List.of(
                 new ValuesSourceReaderOperator.ShardContext(
@@ -1709,6 +1765,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             );
             MappedFieldType ft = mapperService.fieldType("key");
             var readerFactory = new ValuesSourceReaderOperator.Factory(
+                ByteSizeValue.ofGb(1),
                 List.of(new ValuesSourceReaderOperator.FieldInfo("key", ElementType.INT, shardIdx -> {
                     seenShards.add(shardIdx);
                     return ft.blockLoader(blContext());

+ 2 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

@@ -54,6 +54,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.enrich.LookupFromIndexOperator;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
+import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
@@ -190,6 +191,7 @@ public class LookupFromIndexIT extends AbstractEsqlIntegTestCase {
                 false // no scoring
             );
             ValuesSourceReaderOperator.Factory reader = new ValuesSourceReaderOperator.Factory(
+                PhysicalSettings.VALUES_LOADING_JUMBO_SIZE.getDefault(Settings.EMPTY),
                 List.of(
                     new ValuesSourceReaderOperator.FieldInfo(
                         "key",

+ 1 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

@@ -440,6 +440,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
         }
         return new ValuesSourceReaderOperator(
             driverContext.blockFactory(),
+            Long.MAX_VALUE,
             fields,
             List.of(
                 new ValuesSourceReaderOperator.ShardContext(

+ 12 - 10
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -17,7 +17,6 @@ import org.elasticsearch.common.logging.HeaderWarning;
 import org.elasticsearch.compute.aggregation.GroupingAggregator;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneCountOperator;
 import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.LuceneSliceQueue;
@@ -108,17 +107,17 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
     }
 
     private final List<ShardContext> shardContexts;
-    private final DataPartitioning defaultDataPartitioning;
+    private final PhysicalSettings physicalSettings;
 
     public EsPhysicalOperationProviders(
         FoldContext foldContext,
         List<ShardContext> shardContexts,
         AnalysisRegistry analysisRegistry,
-        DataPartitioning defaultDataPartitioning
+        PhysicalSettings physicalSettings
     ) {
         super(foldContext, analysisRegistry);
         this.shardContexts = shardContexts;
-        this.defaultDataPartitioning = defaultDataPartitioning;
+        this.physicalSettings = physicalSettings;
     }
 
     @Override
@@ -148,7 +147,10 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
             IntFunction<BlockLoader> loader = s -> getBlockLoaderFor(s, fieldName, isUnsupported, fieldExtractPreference, unionTypes);
             fields.add(new ValuesSourceReaderOperator.FieldInfo(fieldName, elementType, loader));
         }
-        return source.with(new ValuesSourceReaderOperator.Factory(fields, readers, docChannel), layout.build());
+        return source.with(
+            new ValuesSourceReaderOperator.Factory(physicalSettings.valuesLoadingJumboSize(), fields, readers, docChannel),
+            layout.build()
+        );
     }
 
     private BlockLoader getBlockLoaderFor(
@@ -203,7 +205,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
             luceneFactory = new LuceneTopNSourceOperator.Factory(
                 shardContexts,
                 querySupplier(esQueryExec.query()),
-                context.queryPragmas().dataPartitioning(defaultDataPartitioning),
+                context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
                 context.queryPragmas().taskConcurrency(),
                 context.pageSize(rowEstimatedSize),
                 limit,
@@ -217,7 +219,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
                 luceneFactory = new LuceneSourceOperator.Factory(
                     shardContexts,
                     querySupplier(esQueryExec.query()),
-                    context.queryPragmas().dataPartitioning(defaultDataPartitioning),
+                    context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
                     context.queryPragmas().taskConcurrency(),
                     context.pageSize(rowEstimatedSize),
                     limit,
@@ -239,7 +241,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         return new LuceneCountOperator.Factory(
             shardContexts,
             querySupplier(queryBuilder),
-            context.queryPragmas().dataPartitioning(defaultDataPartitioning),
+            context.queryPragmas().dataPartitioning(physicalSettings.defaultDataPartitioning()),
             context.queryPragmas().taskConcurrency(),
             limit == null ? NO_LIMIT : (Integer) limit.fold(context.foldCtx())
         );
@@ -435,8 +437,8 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
             }
             return new ColumnAtATimeReader() {
                 @Override
-                public Block read(BlockFactory factory, Docs docs) throws IOException {
-                    Block block = reader.read(factory, docs);
+                public Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+                    Block block = reader.read(factory, docs, offset);
                     return typeConverter.convert((org.elasticsearch.compute.data.Block) block);
                 }
 

+ 64 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PhysicalSettings.java

@@ -0,0 +1,64 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner;
+
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.unit.MemorySizeValue;
+import org.elasticsearch.compute.lucene.DataPartitioning;
+import org.elasticsearch.monitor.jvm.JvmInfo;
+
+/**
+ * Values for cluster level settings used in physical planning.
+ */
+public class PhysicalSettings {
+    public static final Setting<DataPartitioning> DEFAULT_DATA_PARTITIONING = Setting.enumSetting(
+        DataPartitioning.class,
+        "esql.default_data_partitioning",
+        DataPartitioning.AUTO,
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
+    public static final Setting<ByteSizeValue> VALUES_LOADING_JUMBO_SIZE = new Setting<>("esql.values_loading_jumbo_size", settings -> {
+        long proportional = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / 1024;
+        return ByteSizeValue.ofBytes(Math.max(proportional, ByteSizeValue.ofMb(1).getBytes())).getStringRep();
+    },
+        s -> MemorySizeValue.parseBytesSizeValueOrHeapRatio(s, "esql.values_loading_jumbo_size"),
+        Setting.Property.NodeScope,
+        Setting.Property.Dynamic
+    );
+
+    private volatile DataPartitioning defaultDataPartitioning;
+    private volatile ByteSizeValue valuesLoadingJumboSize;
+
+    /**
+     * Ctor for prod that listens for updates from the {@link ClusterService}.
+     */
+    public PhysicalSettings(ClusterService clusterService) {
+        clusterService.getClusterSettings().initializeAndWatch(DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
+        clusterService.getClusterSettings().initializeAndWatch(VALUES_LOADING_JUMBO_SIZE, v -> this.valuesLoadingJumboSize = v);
+    }
+
+    /**
+     * Ctor for testing.
+     */
+    public PhysicalSettings(DataPartitioning defaultDataPartitioning, ByteSizeValue valuesLoadingJumboSize) {
+        this.defaultDataPartitioning = defaultDataPartitioning;
+        this.valuesLoadingJumboSize = valuesLoadingJumboSize;
+    }
+
+    public DataPartitioning defaultDataPartitioning() {
+        return defaultDataPartitioning;
+    }
+
+    public ByteSizeValue valuesLoadingJumboSize() {
+        return valuesLoadingJumboSize;
+    }
+}

+ 4 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -18,7 +18,6 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.operator.DriverCompletionInfo;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
 import org.elasticsearch.compute.operator.FailureCollector;
@@ -56,6 +55,7 @@ import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
+import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 import org.elasticsearch.xpack.esql.session.Configuration;
 import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
@@ -95,8 +95,7 @@ public class ComputeService {
     private final DataNodeComputeHandler dataNodeComputeHandler;
     private final ClusterComputeHandler clusterComputeHandler;
     private final ExchangeService exchangeService;
-
-    private volatile DataPartitioning defaultDataPartitioning;
+    private final PhysicalSettings physicalSettings;
 
     @SuppressWarnings("this-escape")
     public ComputeService(
@@ -133,7 +132,7 @@ public class ComputeService {
             esqlExecutor,
             dataNodeComputeHandler
         );
-        clusterService.getClusterSettings().initializeAndWatch(EsqlPlugin.DEFAULT_DATA_PARTITIONING, v -> this.defaultDataPartitioning = v);
+        this.physicalSettings = new PhysicalSettings(clusterService);
     }
 
     public void execute(
@@ -454,7 +453,7 @@ public class ComputeService {
             context.foldCtx(),
             contexts,
             searchService.getIndicesService().getAnalysis(),
-            defaultDataPartitioning
+            physicalSettings
         );
         try {
             LocalExecutionPlanner planner = new LocalExecutionPlanner(

+ 3 - 10
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -23,7 +23,6 @@ import org.elasticsearch.common.util.FeatureFlag;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockFactoryProvider;
-import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
@@ -71,6 +70,7 @@ import org.elasticsearch.xpack.esql.expression.ExpressionWritables;
 import org.elasticsearch.xpack.esql.io.stream.ExpressionQueryBuilder;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamWrapperQueryBuilder;
 import org.elasticsearch.xpack.esql.plan.PlanWritables;
+import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
 import org.elasticsearch.xpack.esql.querylog.EsqlQueryLog;
 import org.elasticsearch.xpack.esql.session.IndexResolver;
@@ -179,14 +179,6 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         Setting.Property.Dynamic
     );
 
-    public static final Setting<DataPartitioning> DEFAULT_DATA_PARTITIONING = Setting.enumSetting(
-        DataPartitioning.class,
-        "esql.default_data_partitioning",
-        DataPartitioning.AUTO,
-        Setting.Property.NodeScope,
-        Setting.Property.Dynamic
-    );
-
     @Override
     public Collection<?> createComponents(PluginServices services) {
         CircuitBreaker circuitBreaker = services.indicesService().getBigArrays().breakerService().getBreaker("request");
@@ -250,8 +242,9 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
             ESQL_QUERYLOG_THRESHOLD_INFO_SETTING,
             ESQL_QUERYLOG_THRESHOLD_WARN_SETTING,
             ESQL_QUERYLOG_INCLUDE_USER_SETTING,
+            PhysicalSettings.DEFAULT_DATA_PARTITIONING,
+            PhysicalSettings.VALUES_LOADING_JUMBO_SIZE,
             STORED_FIELDS_SEQUENTIAL_PROPORTION,
-            DEFAULT_DATA_PARTITIONING,
             EsqlFlags.ESQL_STRING_LIKE_ON_INDEX
         );
     }

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

@@ -21,6 +21,7 @@ import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
 
 import java.io.IOException;
 import java.util.Locale;
@@ -44,7 +45,7 @@ public final class QueryPragmas implements Writeable {
      * the enum {@link DataPartitioning} which has more documentation. Not an
      * {@link Setting#enumSetting} because those can't have {@code null} defaults.
      * {@code null} here means "use the default from the cluster setting
-     * named {@link EsqlPlugin#DEFAULT_DATA_PARTITIONING}."
+     * named {@link PhysicalSettings#DEFAULT_DATA_PARTITIONING}."
      */
     public static final Setting<String> DATA_PARTITIONING = Setting.simpleString("data_partitioning");
 

+ 8 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.Build;
 import org.elasticsearch.common.geo.ShapeRelation;
 import org.elasticsearch.common.lucene.BytesRefs;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.compute.aggregation.AggregatorMode;
@@ -135,6 +136,7 @@ import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
 import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
 import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
 import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
+import org.elasticsearch.xpack.esql.planner.PhysicalSettings;
 import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
 import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
@@ -7876,7 +7878,12 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             null,
             null,
             null,
-            new EsPhysicalOperationProviders(FoldContext.small(), List.of(), null, DataPartitioning.AUTO),
+            new EsPhysicalOperationProviders(
+                FoldContext.small(),
+                List.of(),
+                null,
+                new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1))
+            ),
             List.of()
         );
 

+ 7 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

@@ -18,6 +18,7 @@ import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
@@ -229,7 +230,12 @@ public class LocalExecutionPlannerTests extends MapperServiceTestCase {
     }
 
     private EsPhysicalOperationProviders esPhysicalOperationProviders(List<EsPhysicalOperationProviders.ShardContext> shardContexts) {
-        return new EsPhysicalOperationProviders(FoldContext.small(), shardContexts, null, DataPartitioning.AUTO);
+        return new EsPhysicalOperationProviders(
+            FoldContext.small(),
+            shardContexts,
+            null,
+            new PhysicalSettings(DataPartitioning.AUTO, ByteSizeValue.ofMb(1))
+        );
     }
 
     private List<EsPhysicalOperationProviders.ShardContext> createShardContexts() throws IOException {

+ 15 - 11
x-pack/plugin/mapper-aggregate-metric/src/main/java/org/elasticsearch/xpack/aggregatemetric/mapper/AggregateMetricDoubleFieldMapper.java

@@ -571,20 +571,24 @@ public class AggregateMetricDoubleFieldMapper extends FieldMapper {
                     }
 
                     @Override
-                    public Block read(BlockFactory factory, Docs docs) throws IOException {
-                        try (var builder = factory.aggregateMetricDoubleBuilder(docs.count())) {
-                            copyDoubleValuesToBuilder(docs, builder.min(), minValues);
-                            copyDoubleValuesToBuilder(docs, builder.max(), maxValues);
-                            copyDoubleValuesToBuilder(docs, builder.sum(), sumValues);
-                            copyIntValuesToBuilder(docs, builder.count(), valueCountValues);
+                    public Block read(BlockFactory factory, Docs docs, int offset) throws IOException {
+                        try (var builder = factory.aggregateMetricDoubleBuilder(docs.count() - offset)) {
+                            copyDoubleValuesToBuilder(docs, offset, builder.min(), minValues);
+                            copyDoubleValuesToBuilder(docs, offset, builder.max(), maxValues);
+                            copyDoubleValuesToBuilder(docs, offset, builder.sum(), sumValues);
+                            copyIntValuesToBuilder(docs, offset, builder.count(), valueCountValues);
                             return builder.build();
                         }
                     }
 
-                    private void copyDoubleValuesToBuilder(Docs docs, BlockLoader.DoubleBuilder builder, NumericDocValues values)
-                        throws IOException {
+                    private void copyDoubleValuesToBuilder(
+                        Docs docs,
+                        int offset,
+                        BlockLoader.DoubleBuilder builder,
+                        NumericDocValues values
+                    ) throws IOException {
                         int lastDoc = -1;
-                        for (int i = 0; i < docs.count(); i++) {
+                        for (int i = offset; i < docs.count(); i++) {
                             int doc = docs.get(i);
                             if (doc < lastDoc) {
                                 throw new IllegalStateException("docs within same block must be in order");
@@ -600,10 +604,10 @@ public class AggregateMetricDoubleFieldMapper extends FieldMapper {
                         }
                     }
 
-                    private void copyIntValuesToBuilder(Docs docs, BlockLoader.IntBuilder builder, NumericDocValues values)
+                    private void copyIntValuesToBuilder(Docs docs, int offset, BlockLoader.IntBuilder builder, NumericDocValues values)
                         throws IOException {
                         int lastDoc = -1;
-                        for (int i = 0; i < docs.count(); i++) {
+                        for (int i = offset; i < docs.count(); i++) {
                             int doc = docs.get(i);
                             if (doc < lastDoc) {
                                 throw new IllegalStateException("docs within same block must be in order");

+ 2 - 2
x-pack/plugin/mapper-constant-keyword/src/test/java/org/elasticsearch/xpack/constantkeyword/mapper/ConstantKeywordFieldMapperTests.java

@@ -276,7 +276,7 @@ public class ConstantKeywordFieldMapperTests extends MapperTestCase {
             iw.close();
             try (DirectoryReader reader = DirectoryReader.open(directory)) {
                 TestBlock block = (TestBlock) loader.columnAtATimeReader(reader.leaves().get(0))
-                    .read(TestBlock.factory(reader.numDocs()), new BlockLoader.Docs() {
+                    .read(TestBlock.factory(), new BlockLoader.Docs() {
                         @Override
                         public int count() {
                             return 1;
@@ -286,7 +286,7 @@ public class ConstantKeywordFieldMapperTests extends MapperTestCase {
                         public int get(int i) {
                             return 0;
                         }
-                    });
+                    }, 0);
                 assertThat(block.get(0), nullValue());
             }
         }