1
0
Эх сурвалжийг харах

ESQL: Refactor value reading so it can split Pages (#130573)

This refactors our `ValuesSourceReaderOperator` so it can split pages
when it reads large values. It does not *actually* split the pages as
that's a bit tricky. But it sets the stage for the next PR that will
do so.

* Move `ValuesSourceReaderOperator` to it's own package
* Move many inner classes into their own top level classes
* Extend from `AbstractPageMappingToIteratorOperator` instead of
  `AbstractPageMappingToOperator`
  * This allows returning more than one `Page` per input `Page`
  * In this PR we still always return one `Page` per input `Page`
  * Make new `ReleasableIterator` subclasses to satisfy
    `AbstractPageMappingToIteratorOperator`
  * Change `status` of loading fields from `pages_processed` to
    `pages_received` and `pages_emitted`
* Fix a bug in `AbstractPageMappingToOperator` which can leak circuit
  breaker allocation if we fail to during `receive`. This isn't possible
  in the existing implementations but is possible
  in `ValuesSourceReaderOperator`.
* Add a test with large text fields. Right now it still comes back in
one page because we don't cut the pages.

Closes #130727
Nik Everett 3 сар өмнө
parent
commit
75fe33dc8f
35 өөрчлөгдсөн 1307 нэмэгдсэн , 878 устгасан
  1. 4 3
      benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java
  2. 5 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  3. 1 0
      x-pack/plugin/esql/compute/src/main/java/module-info.java
  4. 0 792
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java
  5. 45 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java
  6. 93 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java
  7. 3 2
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java
  8. 166 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java
  9. 194 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java
  10. 58 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java
  11. 306 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java
  12. 159 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java
  13. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java
  14. 49 13
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java
  15. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java
  16. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java
  17. 1 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java
  18. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java
  19. 1 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java
  20. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java
  21. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java
  22. 3 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java
  23. 13 4
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java
  24. 29 17
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java
  25. 136 17
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java
  26. 3 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java
  27. 3 3
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java
  28. 5 4
      x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java
  29. 4 3
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java
  30. 1 1
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java
  31. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  32. 10 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
  33. 2 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java
  34. 2 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  35. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

+ 4 - 3
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

@@ -41,7 +41,8 @@ import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.ShardRefCounted;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
 import org.elasticsearch.compute.operator.topn.TopNOperator;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.index.IndexSettings;
@@ -343,7 +344,7 @@ public class ValuesSourceReaderBenchmark {
         );
         long sum = 0;
         for (Page page : pages) {
-            op.addInput(page);
+            op.addInput(page.shallowCopy());
             switch (name) {
                 case "long" -> {
                     LongVector values = op.getOutput().<LongBlock>getBlock(1).asVector();
@@ -411,7 +412,7 @@ public class ValuesSourceReaderBenchmark {
             throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
         }
         boolean foundStoredFieldLoader = false;
-        ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
+        ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
         for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
             if (e.getKey().indexOf("stored_fields") >= 0) {
                 foundStoredFieldLoader = true;

+ 5 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -210,6 +210,7 @@ public class TransportVersions {
     public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
     public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
     public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62);
+    public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63);
     public static final TransportVersion V_9_0_0 = def(9_000_0_09);
     public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
     public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11);
@@ -326,9 +327,13 @@ public class TransportVersions {
     public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
     public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
     public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
+    public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01);
+    // Below is the first version in 9.2 and NOT in 9.1.
     public static final TransportVersion PROJECT_STATE_REGISTRY_RECORDS_DELETIONS = def(9_113_0_00);
     public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00);
     public static final TransportVersion ML_INFERENCE_IBM_WATSONX_COMPLETION_ADDED = def(9_115_0_00);
+    public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES = def(9_116_0_00);
+
     /*
      * STOP! READ THIS FIRST! No, really,
      *        ____ _____ ___  ____  _        ____  _____    _    ____    _____ _   _ ___ ____    _____ ___ ____  ____ _____ _

+ 1 - 0
x-pack/plugin/esql/compute/src/main/java/module-info.java

@@ -36,4 +36,5 @@ module org.elasticsearch.compute {
     exports org.elasticsearch.compute.aggregation.table;
     exports org.elasticsearch.compute.data.sort;
     exports org.elasticsearch.compute.querydsl.query;
+    exports org.elasticsearch.compute.lucene.read;
 }

+ 0 - 792
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

@@ -1,792 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.compute.lucene;
-
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.LeafReaderContext;
-import org.apache.lucene.index.SortedDocValues;
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.TransportVersion;
-import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.BlockFactory;
-import org.elasticsearch.compute.data.BytesRefBlock;
-import org.elasticsearch.compute.data.DocBlock;
-import org.elasticsearch.compute.data.DocVector;
-import org.elasticsearch.compute.data.ElementType;
-import org.elasticsearch.compute.data.IntVector;
-import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.data.SingletonOrdinalsBuilder;
-import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
-import org.elasticsearch.compute.operator.DriverContext;
-import org.elasticsearch.compute.operator.Operator;
-import org.elasticsearch.core.Releasable;
-import org.elasticsearch.core.Releasables;
-import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
-import org.elasticsearch.index.mapper.BlockLoader;
-import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
-import org.elasticsearch.index.mapper.SourceLoader;
-import org.elasticsearch.search.fetch.StoredFieldsSpec;
-import org.elasticsearch.xcontent.XContentBuilder;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
-import java.util.function.IntFunction;
-import java.util.function.Supplier;
-
-import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
-import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
-
-/**
- * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
- * and outputs them to a new column.
- */
-public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
-    /**
-     * Minimum number of documents for which it is more efficient to use a
-     * sequential stored field reader when reading stored fields.
-     * <p>
-     *     The sequential stored field reader decompresses a whole block of docs
-     *     at a time so for very short lists it won't be faster to use it. We use
-     *     {@code 10} documents as the boundary for "very short" because it's what
-     *     search does, not because we've done extensive testing on the number.
-     * </p>
-     */
-    static final int SEQUENTIAL_BOUNDARY = 10;
-
-    /**
-     * Creates a factory for {@link ValuesSourceReaderOperator}.
-     * @param fields fields to load
-     * @param shardContexts per-shard loading information
-     * @param docChannel the channel containing the shard, leaf/segment and doc id
-     */
-    public record Factory(List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) implements OperatorFactory {
-        @Override
-        public Operator get(DriverContext driverContext) {
-            return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel);
-        }
-
-        @Override
-        public String describe() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("ValuesSourceReaderOperator[fields = [");
-            if (fields.size() < 10) {
-                boolean first = true;
-                for (FieldInfo f : fields) {
-                    if (first) {
-                        first = false;
-                    } else {
-                        sb.append(", ");
-                    }
-                    sb.append(f.name);
-                }
-            } else {
-                sb.append(fields.size()).append(" fields");
-            }
-            return sb.append("]]").toString();
-        }
-    }
-
-    /**
-     * Configuration for a field to load.
-     *
-     * {@code blockLoader} maps shard index to the {@link BlockLoader}s
-     * which load the actual blocks.
-     */
-    public record FieldInfo(String name, ElementType type, IntFunction<BlockLoader> blockLoader) {}
-
-    public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
-
-    private final FieldWork[] fields;
-    private final List<ShardContext> shardContexts;
-    private final int docChannel;
-    private final BlockFactory blockFactory;
-
-    private final Map<String, Integer> readersBuilt = new TreeMap<>();
-    private long valuesLoaded;
-
-    int lastShard = -1;
-    int lastSegment = -1;
-
-    /**
-     * Creates a new extractor
-     * @param fields fields to load
-     * @param docChannel the channel containing the shard, leaf/segment and doc id
-     */
-    public ValuesSourceReaderOperator(BlockFactory blockFactory, List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) {
-        this.fields = fields.stream().map(f -> new FieldWork(f)).toArray(FieldWork[]::new);
-        this.shardContexts = shardContexts;
-        this.docChannel = docChannel;
-        this.blockFactory = blockFactory;
-    }
-
-    @Override
-    protected Page process(Page page) {
-        DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
-
-        Block[] blocks = new Block[fields.length];
-        boolean success = false;
-        try {
-            if (docVector.singleSegmentNonDecreasing()) {
-                IntVector docs = docVector.docs();
-                int shard = docVector.shards().getInt(0);
-                int segment = docVector.segments().getInt(0);
-                loadFromSingleLeaf(blocks, shard, segment, new BlockLoader.Docs() {
-                    @Override
-                    public int count() {
-                        return docs.getPositionCount();
-                    }
-
-                    @Override
-                    public int get(int i) {
-                        return docs.getInt(i);
-                    }
-                });
-            } else if (docVector.singleSegment()) {
-                loadFromSingleLeafUnsorted(blocks, docVector);
-            } else {
-                try (LoadFromMany many = new LoadFromMany(blocks, docVector)) {
-                    many.run();
-                }
-            }
-            success = true;
-            for (Block b : blocks) {
-                valuesLoaded += b.getTotalValueCount();
-            }
-            return page.appendBlocks(blocks);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        } finally {
-            if (success == false) {
-                Releasables.closeExpectNoException(blocks);
-            }
-        }
-    }
-
-    private void positionFieldWork(int shard, int segment, int firstDoc) {
-        if (lastShard == shard) {
-            if (lastSegment == segment) {
-                for (FieldWork w : fields) {
-                    w.sameSegment(firstDoc);
-                }
-                return;
-            }
-            lastSegment = segment;
-            for (FieldWork w : fields) {
-                w.sameShardNewSegment();
-            }
-            return;
-        }
-        lastShard = shard;
-        lastSegment = segment;
-        for (FieldWork w : fields) {
-            w.newShard(shard);
-        }
-    }
-
-    private boolean positionFieldWorkDocGuarteedAscending(int shard, int segment) {
-        if (lastShard == shard) {
-            if (lastSegment == segment) {
-                return false;
-            }
-            lastSegment = segment;
-            for (FieldWork w : fields) {
-                w.sameShardNewSegment();
-            }
-            return true;
-        }
-        lastShard = shard;
-        lastSegment = segment;
-        for (FieldWork w : fields) {
-            w.newShard(shard);
-        }
-        return true;
-    }
-
-    private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoader.Docs docs) throws IOException {
-        int firstDoc = docs.get(0);
-        positionFieldWork(shard, segment, firstDoc);
-        StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
-        List<RowStrideReaderWork> rowStrideReaders = new ArrayList<>(fields.length);
-        LeafReaderContext ctx = ctx(shard, segment);
-        try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.count())) {
-            for (int f = 0; f < fields.length; f++) {
-                FieldWork field = fields[f];
-                BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx);
-                if (columnAtATime != null) {
-                    blocks[f] = (Block) columnAtATime.read(loaderBlockFactory, docs);
-                    sanityCheckBlock(columnAtATime, docs.count(), blocks[f], f);
-                } else {
-                    rowStrideReaders.add(
-                        new RowStrideReaderWork(
-                            field.rowStride(ctx),
-                            (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()),
-                            field.loader,
-                            f
-                        )
-                    );
-                    storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec());
-                }
-            }
-
-            SourceLoader sourceLoader = null;
-            ShardContext shardContext = shardContexts.get(shard);
-            if (storedFieldsSpec.requiresSource()) {
-                sourceLoader = shardContext.newSourceLoader.get();
-                storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
-            }
-
-            if (rowStrideReaders.isEmpty()) {
-                return;
-            }
-            if (storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
-                throw new IllegalStateException(
-                    "found row stride readers [" + rowStrideReaders + "] without stored fields [" + storedFieldsSpec + "]"
-                );
-            }
-            StoredFieldLoader storedFieldLoader;
-            if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) {
-                storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec);
-                trackStoredFields(storedFieldsSpec, true);
-            } else {
-                storedFieldLoader = StoredFieldLoader.fromSpec(storedFieldsSpec);
-                trackStoredFields(storedFieldsSpec, false);
-            }
-            BlockLoaderStoredFieldsFromLeafLoader storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
-                storedFieldLoader.getLoader(ctx, null),
-                sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
-            );
-            for (int p = 0; p < docs.count(); p++) {
-                int doc = docs.get(p);
-                storedFields.advanceTo(doc);
-                for (RowStrideReaderWork work : rowStrideReaders) {
-                    work.read(doc, storedFields);
-                }
-            }
-            for (RowStrideReaderWork work : rowStrideReaders) {
-                blocks[work.offset] = work.build();
-                sanityCheckBlock(work.reader, docs.count(), blocks[work.offset], work.offset);
-            }
-        } finally {
-            Releasables.close(rowStrideReaders);
-        }
-    }
-
-    private void loadFromSingleLeafUnsorted(Block[] blocks, DocVector docVector) throws IOException {
-        IntVector docs = docVector.docs();
-        int[] forwards = docVector.shardSegmentDocMapForwards();
-        int shard = docVector.shards().getInt(0);
-        int segment = docVector.segments().getInt(0);
-        loadFromSingleLeaf(blocks, shard, segment, new BlockLoader.Docs() {
-            @Override
-            public int count() {
-                return docs.getPositionCount();
-            }
-
-            @Override
-            public int get(int i) {
-                return docs.getInt(forwards[i]);
-            }
-        });
-        final int[] backwards = docVector.shardSegmentDocMapBackwards();
-        for (int i = 0; i < blocks.length; i++) {
-            Block in = blocks[i];
-            blocks[i] = in.filter(backwards);
-            in.close();
-        }
-    }
-
-    private class LoadFromMany implements Releasable {
-        private final Block[] target;
-        private final IntVector shards;
-        private final IntVector segments;
-        private final IntVector docs;
-        private final int[] forwards;
-        private final int[] backwards;
-        private final Block.Builder[][] builders;
-        private final BlockLoader[][] converters;
-        private final Block.Builder[] fieldTypeBuilders;
-        private final BlockLoader.RowStrideReader[] rowStride;
-
-        BlockLoaderStoredFieldsFromLeafLoader storedFields;
-
-        LoadFromMany(Block[] target, DocVector docVector) {
-            this.target = target;
-            shards = docVector.shards();
-            segments = docVector.segments();
-            docs = docVector.docs();
-            forwards = docVector.shardSegmentDocMapForwards();
-            backwards = docVector.shardSegmentDocMapBackwards();
-            fieldTypeBuilders = new Block.Builder[target.length];
-            builders = new Block.Builder[target.length][shardContexts.size()];
-            converters = new BlockLoader[target.length][shardContexts.size()];
-            rowStride = new BlockLoader.RowStrideReader[target.length];
-        }
-
-        void run() throws IOException {
-            for (int f = 0; f < fields.length; f++) {
-                /*
-                 * Important note: each field has a desired type, which might not match the mapped type (in the case of union-types).
-                 * We create the final block builders using the desired type, one for each field, but then also use inner builders
-                 * (one for each field and shard), and converters (again one for each field and shard) to actually perform the field
-                 * loading in a way that is correct for the mapped field type, and then convert between that type and the desired type.
-                 */
-                fieldTypeBuilders[f] = fields[f].info.type.newBlockBuilder(docs.getPositionCount(), blockFactory);
-                builders[f] = new Block.Builder[shardContexts.size()];
-                converters[f] = new BlockLoader[shardContexts.size()];
-            }
-            try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(blockFactory, docs.getPositionCount())) {
-                int p = forwards[0];
-                int shard = shards.getInt(p);
-                int segment = segments.getInt(p);
-                int firstDoc = docs.getInt(p);
-                positionFieldWork(shard, segment, firstDoc);
-                LeafReaderContext ctx = ctx(shard, segment);
-                fieldsMoved(ctx, shard);
-                verifyBuilders(loaderBlockFactory, shard);
-                read(firstDoc, shard);
-                for (int i = 1; i < forwards.length; i++) {
-                    p = forwards[i];
-                    shard = shards.getInt(p);
-                    segment = segments.getInt(p);
-                    boolean changedSegment = positionFieldWorkDocGuarteedAscending(shard, segment);
-                    if (changedSegment) {
-                        ctx = ctx(shard, segment);
-                        fieldsMoved(ctx, shard);
-                    }
-                    verifyBuilders(loaderBlockFactory, shard);
-                    read(docs.getInt(p), shard);
-                }
-            }
-            for (int f = 0; f < target.length; f++) {
-                for (int s = 0; s < shardContexts.size(); s++) {
-                    if (builders[f][s] != null) {
-                        try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) {
-                            fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount());
-                        }
-                    }
-                }
-                try (Block targetBlock = fieldTypeBuilders[f].build()) {
-                    target[f] = targetBlock.filter(backwards);
-                }
-                sanityCheckBlock(rowStride[f], docs.getPositionCount(), target[f], f);
-            }
-        }
-
-        private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException {
-            StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
-            for (int f = 0; f < fields.length; f++) {
-                FieldWork field = fields[f];
-                rowStride[f] = field.rowStride(ctx);
-                storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec());
-            }
-            SourceLoader sourceLoader = null;
-            if (storedFieldsSpec.requiresSource()) {
-                sourceLoader = shardContexts.get(shard).newSourceLoader.get();
-                storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
-            }
-            storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
-                StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null),
-                sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
-            );
-            if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
-                trackStoredFields(storedFieldsSpec, false);
-            }
-        }
-
-        private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) {
-            for (int f = 0; f < fields.length; f++) {
-                if (builders[f][shard] == null) {
-                    // Note that this relies on field.newShard() to set the loader and converter correctly for the current shard
-                    builders[f][shard] = (Block.Builder) fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount());
-                    converters[f][shard] = fields[f].loader;
-                }
-            }
-        }
-
-        private void read(int doc, int shard) throws IOException {
-            storedFields.advanceTo(doc);
-            for (int f = 0; f < builders.length; f++) {
-                rowStride[f].read(doc, storedFields, builders[f][shard]);
-            }
-        }
-
-        @Override
-        public void close() {
-            Releasables.closeExpectNoException(fieldTypeBuilders);
-            for (int f = 0; f < fields.length; f++) {
-                Releasables.closeExpectNoException(builders[f]);
-            }
-        }
-    }
-
-    /**
-     * Is it more efficient to use a sequential stored field reader
-     * when reading stored fields for the documents contained in {@code docIds}?
-     */
-    private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) {
-        int count = docs.count();
-        if (count < SEQUENTIAL_BOUNDARY) {
-            return false;
-        }
-        int range = docs.get(count - 1) - docs.get(0);
-        return range * storedFieldsSequentialProportion <= count;
-    }
-
-    private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {
-        readersBuilt.merge(
-            "stored_fields["
-                + "requires_source:"
-                + spec.requiresSource()
-                + ", fields:"
-                + spec.requiredStoredFields().size()
-                + ", sequential: "
-                + sequential
-                + "]",
-            1,
-            (prev, one) -> prev + one
-        );
-    }
-
-    private class FieldWork {
-        final FieldInfo info;
-
-        BlockLoader loader;
-        BlockLoader.ColumnAtATimeReader columnAtATime;
-        BlockLoader.RowStrideReader rowStride;
-
-        FieldWork(FieldInfo info) {
-            this.info = info;
-        }
-
-        void sameSegment(int firstDoc) {
-            if (columnAtATime != null && columnAtATime.canReuse(firstDoc) == false) {
-                columnAtATime = null;
-            }
-            if (rowStride != null && rowStride.canReuse(firstDoc) == false) {
-                rowStride = null;
-            }
-        }
-
-        void sameShardNewSegment() {
-            columnAtATime = null;
-            rowStride = null;
-        }
-
-        void newShard(int shard) {
-            loader = info.blockLoader.apply(shard);
-            columnAtATime = null;
-            rowStride = null;
-        }
-
-        BlockLoader.ColumnAtATimeReader columnAtATime(LeafReaderContext ctx) throws IOException {
-            if (columnAtATime == null) {
-                columnAtATime = loader.columnAtATimeReader(ctx);
-                trackReader("column_at_a_time", this.columnAtATime);
-            }
-            return columnAtATime;
-        }
-
-        BlockLoader.RowStrideReader rowStride(LeafReaderContext ctx) throws IOException {
-            if (rowStride == null) {
-                rowStride = loader.rowStrideReader(ctx);
-                trackReader("row_stride", this.rowStride);
-            }
-            return rowStride;
-        }
-
-        private void trackReader(String type, BlockLoader.Reader reader) {
-            readersBuilt.merge(info.name + ":" + type + ":" + reader, 1, (prev, one) -> prev + one);
-        }
-    }
-
-    private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset)
-        implements
-            Releasable {
-        void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException {
-            reader.read(doc, storedFields, builder);
-        }
-
-        Block build() {
-            return (Block) loader.convert(builder.build());
-        }
-
-        @Override
-        public void close() {
-            builder.close();
-        }
-    }
-
-    private LeafReaderContext ctx(int shard, int segment) {
-        return shardContexts.get(shard).reader().leaves().get(segment);
-    }
-
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("ValuesSourceReaderOperator[fields = [");
-        if (fields.length < 10) {
-            boolean first = true;
-            for (FieldWork f : fields) {
-                if (first) {
-                    first = false;
-                } else {
-                    sb.append(", ");
-                }
-                sb.append(f.info.name);
-            }
-        } else {
-            sb.append(fields.length).append(" fields");
-        }
-        return sb.append("]]").toString();
-    }
-
-    @Override
-    protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
-        return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded);
-    }
-
-    /**
-     * Quick checks for on the loaded block to make sure it looks reasonable.
-     * @param loader the object that did the loading - we use it to make error messages if the block is busted
-     * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has
-     * @param block the block to sanity check
-     * @param field offset into the {@link #fields} array for the block being loaded
-     */
-    private void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) {
-        if (block.getPositionCount() != expectedPositions) {
-            throw new IllegalStateException(
-                sanityCheckBlockErrorPrefix(loader, block, field)
-                    + " has ["
-                    + block.getPositionCount()
-                    + "] positions instead of ["
-                    + expectedPositions
-                    + "]"
-            );
-        }
-        if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) {
-            throw new IllegalStateException(
-                sanityCheckBlockErrorPrefix(loader, block, field)
-                    + "'s element_type ["
-                    + block.elementType()
-                    + "] NOT IN (NULL, "
-                    + fields[field].info.type
-                    + ")"
-            );
-        }
-    }
-
-    private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) {
-        return fields[field].info.name + "[" + loader + "]: " + block;
-    }
-
-    public static class Status extends AbstractPageMappingOperator.Status {
-        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
-            Operator.Status.class,
-            "values_source_reader",
-            Status::new
-        );
-
-        private final Map<String, Integer> readersBuilt;
-        private final long valuesLoaded;
-
-        Status(
-            Map<String, Integer> readersBuilt,
-            long processNanos,
-            int pagesProcessed,
-            long rowsReceived,
-            long rowsEmitted,
-            long valuesLoaded
-        ) {
-            super(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
-            this.readersBuilt = readersBuilt;
-            this.valuesLoaded = valuesLoaded;
-        }
-
-        Status(StreamInput in) throws IOException {
-            super(in);
-            readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt);
-            valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeMap(readersBuilt, StreamOutput::writeVInt);
-            if (supportsValuesLoaded(out.getTransportVersion())) {
-                out.writeVLong(valuesLoaded);
-            }
-        }
-
-        private static boolean supportsValuesLoaded(TransportVersion version) {
-            return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)
-                || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19);
-        }
-
-        @Override
-        public String getWriteableName() {
-            return ENTRY.name;
-        }
-
-        public Map<String, Integer> readersBuilt() {
-            return readersBuilt;
-        }
-
-        @Override
-        public long valuesLoaded() {
-            return valuesLoaded;
-        }
-
-        @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.startObject();
-            builder.startObject("readers_built");
-            for (Map.Entry<String, Integer> e : readersBuilt.entrySet()) {
-                builder.field(e.getKey(), e.getValue());
-            }
-            builder.endObject();
-            builder.field("values_loaded", valuesLoaded);
-            innerToXContent(builder);
-            return builder.endObject();
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (super.equals(o) == false) return false;
-            Status status = (Status) o;
-            return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded);
-        }
-
-        @Override
-        public String toString() {
-            return Strings.toString(this);
-        }
-    }
-
-    private static class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
-        private final int pageSize;
-        private Block nullBlock;
-
-        private ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
-            super(factory);
-            this.pageSize = pageSize;
-        }
-
-        @Override
-        public Block constantNulls() {
-            if (nullBlock == null) {
-                nullBlock = factory.newConstantNullBlock(pageSize);
-            }
-            nullBlock.incRef();
-            return nullBlock;
-        }
-
-        @Override
-        public void close() {
-            if (nullBlock != null) {
-                nullBlock.close();
-            }
-        }
-
-        @Override
-        public BytesRefBlock constantBytes(BytesRef value) {
-            return factory.newConstantBytesRefBlockWith(value, pageSize);
-        }
-    }
-
-    public abstract static class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
-        protected final BlockFactory factory;
-
-        protected DelegatingBlockLoaderFactory(BlockFactory factory) {
-            this.factory = factory;
-        }
-
-        @Override
-        public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
-            return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
-        }
-
-        @Override
-        public BlockLoader.BooleanBuilder booleans(int expectedCount) {
-            return factory.newBooleanBlockBuilder(expectedCount);
-        }
-
-        @Override
-        public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) {
-            return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
-        }
-
-        @Override
-        public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) {
-            return factory.newBytesRefBlockBuilder(expectedCount);
-        }
-
-        @Override
-        public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
-            return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
-        }
-
-        @Override
-        public BlockLoader.DoubleBuilder doubles(int expectedCount) {
-            return factory.newDoubleBlockBuilder(expectedCount);
-        }
-
-        @Override
-        public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) {
-            return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions);
-        }
-
-        @Override
-        public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) {
-            return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
-        }
-
-        @Override
-        public BlockLoader.IntBuilder ints(int expectedCount) {
-            return factory.newIntBlockBuilder(expectedCount);
-        }
-
-        @Override
-        public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) {
-            return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
-        }
-
-        @Override
-        public BlockLoader.LongBuilder longs(int expectedCount) {
-            return factory.newLongBlockBuilder(expectedCount);
-        }
-
-        @Override
-        public BlockLoader.Builder nulls(int expectedCount) {
-            return ElementType.NULL.newBlockBuilder(expectedCount, factory);
-        }
-
-        @Override
-        public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
-            return new SingletonOrdinalsBuilder(factory, ordinals, count);
-        }
-
-        @Override
-        public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
-            return factory.newAggregateMetricDoubleBlockBuilder(count);
-        }
-    }
-}

+ 45 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ComputeBlockLoaderFactory.java

@@ -0,0 +1,45 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.core.Releasable;
+
+class ComputeBlockLoaderFactory extends DelegatingBlockLoaderFactory implements Releasable {
+    private final int pageSize;
+    private Block nullBlock;
+
+    ComputeBlockLoaderFactory(BlockFactory factory, int pageSize) {
+        super(factory);
+        this.pageSize = pageSize;
+    }
+
+    @Override
+    public Block constantNulls() {
+        if (nullBlock == null) {
+            nullBlock = factory.newConstantNullBlock(pageSize);
+        }
+        nullBlock.incRef();
+        return nullBlock;
+    }
+
+    @Override
+    public void close() {
+        if (nullBlock != null) {
+            nullBlock.close();
+        }
+    }
+
+    @Override
+    public BytesRefBlock constantBytes(BytesRef value) {
+        return factory.newConstantBytesRefBlockWith(value, pageSize);
+    }
+}

+ 93 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/DelegatingBlockLoaderFactory.java

@@ -0,0 +1,93 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.apache.lucene.index.SortedDocValues;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.SingletonOrdinalsBuilder;
+import org.elasticsearch.index.mapper.BlockLoader;
+
+public abstract class DelegatingBlockLoaderFactory implements BlockLoader.BlockFactory {
+    protected final BlockFactory factory;
+
+    protected DelegatingBlockLoaderFactory(BlockFactory factory) {
+        this.factory = factory;
+    }
+
+    @Override
+    public BlockLoader.BooleanBuilder booleansFromDocValues(int expectedCount) {
+        return factory.newBooleanBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
+    }
+
+    @Override
+    public BlockLoader.BooleanBuilder booleans(int expectedCount) {
+        return factory.newBooleanBlockBuilder(expectedCount);
+    }
+
+    @Override
+    public BlockLoader.BytesRefBuilder bytesRefsFromDocValues(int expectedCount) {
+        return factory.newBytesRefBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
+    }
+
+    @Override
+    public BlockLoader.BytesRefBuilder bytesRefs(int expectedCount) {
+        return factory.newBytesRefBlockBuilder(expectedCount);
+    }
+
+    @Override
+    public BlockLoader.DoubleBuilder doublesFromDocValues(int expectedCount) {
+        return factory.newDoubleBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
+    }
+
+    @Override
+    public BlockLoader.DoubleBuilder doubles(int expectedCount) {
+        return factory.newDoubleBlockBuilder(expectedCount);
+    }
+
+    @Override
+    public BlockLoader.FloatBuilder denseVectors(int expectedVectorsCount, int dimensions) {
+        return factory.newFloatBlockBuilder(expectedVectorsCount * dimensions);
+    }
+
+    @Override
+    public BlockLoader.IntBuilder intsFromDocValues(int expectedCount) {
+        return factory.newIntBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
+    }
+
+    @Override
+    public BlockLoader.IntBuilder ints(int expectedCount) {
+        return factory.newIntBlockBuilder(expectedCount);
+    }
+
+    @Override
+    public BlockLoader.LongBuilder longsFromDocValues(int expectedCount) {
+        return factory.newLongBlockBuilder(expectedCount).mvOrdering(Block.MvOrdering.SORTED_ASCENDING);
+    }
+
+    @Override
+    public BlockLoader.LongBuilder longs(int expectedCount) {
+        return factory.newLongBlockBuilder(expectedCount);
+    }
+
+    @Override
+    public BlockLoader.Builder nulls(int expectedCount) {
+        return ElementType.NULL.newBlockBuilder(expectedCount, factory);
+    }
+
+    @Override
+    public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
+        return new SingletonOrdinalsBuilder(factory, ordinals, count);
+    }
+
+    @Override
+    public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
+        return factory.newAggregateMetricDoubleBlockBuilder(count);
+    }
+}

+ 3 - 2
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesExtractFieldOperator.java → x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/TimeSeriesExtractFieldOperator.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.compute.lucene;
+package org.elasticsearch.compute.lucene.read;
 
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.LeafReaderContext;
@@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
 import org.elasticsearch.compute.data.OrdinalBytesRefVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.ShardContext;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
@@ -191,7 +192,7 @@ public class TimeSeriesExtractFieldOperator extends AbstractPageMappingOperator
         Releasables.close(fieldsReader, super::close);
     }
 
-    static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
+    static class BlockLoaderFactory extends DelegatingBlockLoaderFactory {
         BlockLoaderFactory(BlockFactory factory) {
             super(factory);
         }

+ 166 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromManyReader.java

@@ -0,0 +1,166 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.DocVector;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
+import org.elasticsearch.index.mapper.BlockLoader;
+import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
+import org.elasticsearch.index.mapper.SourceLoader;
+import org.elasticsearch.search.fetch.StoredFieldsSpec;
+
+import java.io.IOException;
+
+/**
+ * Loads values from a many leaves. Much less efficient than {@link ValuesFromSingleReader}.
+ */
+class ValuesFromManyReader extends ValuesReader {
+    private final int[] forwards;
+    private final int[] backwards;
+    private final BlockLoader.RowStrideReader[] rowStride;
+
+    private BlockLoaderStoredFieldsFromLeafLoader storedFields;
+
+    ValuesFromManyReader(ValuesSourceReaderOperator operator, DocVector docs) {
+        super(operator, docs);
+        forwards = docs.shardSegmentDocMapForwards();
+        backwards = docs.shardSegmentDocMapBackwards();
+        rowStride = new BlockLoader.RowStrideReader[operator.fields.length];
+    }
+
+    @Override
+    protected void load(Block[] target, int offset) throws IOException {
+        try (Run run = new Run(target)) {
+            run.run(offset);
+        }
+    }
+
+    class Run implements Releasable {
+        private final Block[] target;
+        private final Block.Builder[][] builders;
+        private final BlockLoader[][] converters;
+        private final Block.Builder[] fieldTypeBuilders;
+
+        Run(Block[] target) {
+            this.target = target;
+            fieldTypeBuilders = new Block.Builder[target.length];
+            builders = new Block.Builder[target.length][operator.shardContexts.size()];
+            converters = new BlockLoader[target.length][operator.shardContexts.size()];
+        }
+
+        void run(int offset) throws IOException {
+            assert offset == 0; // TODO allow non-0 offset to support splitting pages
+            for (int f = 0; f < operator.fields.length; f++) {
+                /*
+                 * Important note: each field has a desired type, which might not match the mapped type (in the case of union-types).
+                 * We create the final block builders using the desired type, one for each field, but then also use inner builders
+                 * (one for each field and shard), and converters (again one for each field and shard) to actually perform the field
+                 * loading in a way that is correct for the mapped field type, and then convert between that type and the desired type.
+                 */
+                fieldTypeBuilders[f] = operator.fields[f].info.type().newBlockBuilder(docs.getPositionCount(), operator.blockFactory);
+                builders[f] = new Block.Builder[operator.shardContexts.size()];
+                converters[f] = new BlockLoader[operator.shardContexts.size()];
+            }
+            try (
+                ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.getPositionCount())
+            ) {
+                int p = forwards[offset];
+                int shard = docs.shards().getInt(p);
+                int segment = docs.segments().getInt(p);
+                int firstDoc = docs.docs().getInt(p);
+                operator.positionFieldWork(shard, segment, firstDoc);
+                LeafReaderContext ctx = operator.ctx(shard, segment);
+                fieldsMoved(ctx, shard);
+                verifyBuilders(loaderBlockFactory, shard);
+                read(firstDoc, shard);
+
+                int i = offset + 1;
+                while (i < forwards.length) {
+                    p = forwards[i];
+                    shard = docs.shards().getInt(p);
+                    segment = docs.segments().getInt(p);
+                    boolean changedSegment = operator.positionFieldWorkDocGuaranteedAscending(shard, segment);
+                    if (changedSegment) {
+                        ctx = operator.ctx(shard, segment);
+                        fieldsMoved(ctx, shard);
+                    }
+                    verifyBuilders(loaderBlockFactory, shard);
+                    read(docs.docs().getInt(p), shard);
+                    i++;
+                }
+                buildBlocks();
+            }
+        }
+
+        private void buildBlocks() {
+            for (int f = 0; f < target.length; f++) {
+                for (int s = 0; s < operator.shardContexts.size(); s++) {
+                    if (builders[f][s] != null) {
+                        try (Block orig = (Block) converters[f][s].convert(builders[f][s].build())) {
+                            fieldTypeBuilders[f].copyFrom(orig, 0, orig.getPositionCount());
+                        }
+                    }
+                }
+                try (Block targetBlock = fieldTypeBuilders[f].build()) {
+                    target[f] = targetBlock.filter(backwards);
+                }
+                operator.sanityCheckBlock(rowStride[f], backwards.length, target[f], f);
+            }
+        }
+
+        private void verifyBuilders(ComputeBlockLoaderFactory loaderBlockFactory, int shard) {
+            for (int f = 0; f < operator.fields.length; f++) {
+                if (builders[f][shard] == null) {
+                    // Note that this relies on field.newShard() to set the loader and converter correctly for the current shard
+                    builders[f][shard] = (Block.Builder) operator.fields[f].loader.builder(loaderBlockFactory, docs.getPositionCount());
+                    converters[f][shard] = operator.fields[f].loader;
+                }
+            }
+        }
+
+        private void read(int doc, int shard) throws IOException {
+            storedFields.advanceTo(doc);
+            for (int f = 0; f < builders.length; f++) {
+                rowStride[f].read(doc, storedFields, builders[f][shard]);
+            }
+        }
+
+        @Override
+        public void close() {
+            Releasables.closeExpectNoException(fieldTypeBuilders);
+            for (int f = 0; f < operator.fields.length; f++) {
+                Releasables.closeExpectNoException(builders[f]);
+            }
+        }
+    }
+
+    private void fieldsMoved(LeafReaderContext ctx, int shard) throws IOException {
+        StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
+        for (int f = 0; f < operator.fields.length; f++) {
+            ValuesSourceReaderOperator.FieldWork field = operator.fields[f];
+            rowStride[f] = field.rowStride(ctx);
+            storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec());
+        }
+        SourceLoader sourceLoader = null;
+        if (storedFieldsSpec.requiresSource()) {
+            sourceLoader = operator.shardContexts.get(shard).newSourceLoader().get();
+            storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
+        }
+        storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
+            StoredFieldLoader.fromSpec(storedFieldsSpec).getLoader(ctx, null),
+            sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
+        );
+        if (false == storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
+            operator.trackStoredFields(storedFieldsSpec, false);
+        }
+    }
+}

+ 194 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesFromSingleReader.java

@@ -0,0 +1,194 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.DocVector;
+import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
+import org.elasticsearch.index.mapper.BlockLoader;
+import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
+import org.elasticsearch.index.mapper.SourceLoader;
+import org.elasticsearch.search.fetch.StoredFieldsSpec;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Loads values from a single leaf. Much more efficient than {@link ValuesFromManyReader}.
+ */
+class ValuesFromSingleReader extends ValuesReader {
+    /**
+     * Minimum number of documents for which it is more efficient to use a
+     * sequential stored field reader when reading stored fields.
+     * <p>
+     *     The sequential stored field reader decompresses a whole block of docs
+     *     at a time so for very short lists it won't be faster to use it. We use
+     *     {@code 10} documents as the boundary for "very short" because it's what
+     *     search does, not because we've done extensive testing on the number.
+     * </p>
+     */
+    static final int SEQUENTIAL_BOUNDARY = 10;
+
+    private final int shard;
+    private final int segment;
+
+    ValuesFromSingleReader(ValuesSourceReaderOperator operator, DocVector docs) {
+        super(operator, docs);
+        this.shard = docs.shards().getInt(0);
+        this.segment = docs.segments().getInt(0);
+    }
+
+    @Override
+    protected void load(Block[] target, int offset) throws IOException {
+        assert offset == 0; // TODO allow non-0 offset to support splitting pages
+        if (docs.singleSegmentNonDecreasing()) {
+            loadFromSingleLeaf(target, new BlockLoader.Docs() {
+                @Override
+                public int count() {
+                    return docs.getPositionCount();
+                }
+
+                @Override
+                public int get(int i) {
+                    return docs.docs().getInt(i);
+                }
+            });
+            return;
+        }
+        int[] forwards = docs.shardSegmentDocMapForwards();
+        loadFromSingleLeaf(target, new BlockLoader.Docs() {
+            @Override
+            public int count() {
+                return docs.getPositionCount();
+            }
+
+            @Override
+            public int get(int i) {
+                return docs.docs().getInt(forwards[i]);
+            }
+        });
+        final int[] backwards = docs.shardSegmentDocMapBackwards();
+        for (int i = 0; i < target.length; i++) {
+            try (Block in = target[i]) {
+                target[i] = in.filter(backwards);
+            }
+        }
+    }
+
+    private void loadFromSingleLeaf(Block[] target, BlockLoader.Docs docs) throws IOException {
+        int firstDoc = docs.get(0);
+        operator.positionFieldWork(shard, segment, firstDoc);
+        StoredFieldsSpec storedFieldsSpec = StoredFieldsSpec.NO_REQUIREMENTS;
+        List<RowStrideReaderWork> rowStrideReaders = new ArrayList<>(operator.fields.length);
+        LeafReaderContext ctx = operator.ctx(shard, segment);
+        try (ComputeBlockLoaderFactory loaderBlockFactory = new ComputeBlockLoaderFactory(operator.blockFactory, docs.count())) {
+            for (int f = 0; f < operator.fields.length; f++) {
+                ValuesSourceReaderOperator.FieldWork field = operator.fields[f];
+                BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx);
+                if (columnAtATime != null) {
+                    target[f] = (Block) columnAtATime.read(loaderBlockFactory, docs);
+                    operator.sanityCheckBlock(columnAtATime, docs.count(), target[f], f);
+                } else {
+                    rowStrideReaders.add(
+                        new RowStrideReaderWork(
+                            field.rowStride(ctx),
+                            (Block.Builder) field.loader.builder(loaderBlockFactory, docs.count()),
+                            field.loader,
+                            f
+                        )
+                    );
+                    storedFieldsSpec = storedFieldsSpec.merge(field.loader.rowStrideStoredFieldSpec());
+                }
+            }
+
+            if (rowStrideReaders.isEmpty() == false) {
+                loadFromRowStrideReaders(target, storedFieldsSpec, rowStrideReaders, ctx, docs);
+            }
+        } finally {
+            Releasables.close(rowStrideReaders);
+        }
+    }
+
+    private void loadFromRowStrideReaders(
+        Block[] target,
+        StoredFieldsSpec storedFieldsSpec,
+        List<RowStrideReaderWork> rowStrideReaders,
+        LeafReaderContext ctx,
+        BlockLoader.Docs docs
+    ) throws IOException {
+        SourceLoader sourceLoader = null;
+        ValuesSourceReaderOperator.ShardContext shardContext = operator.shardContexts.get(shard);
+        if (storedFieldsSpec.requiresSource()) {
+            sourceLoader = shardContext.newSourceLoader().get();
+            storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
+        }
+        if (storedFieldsSpec.equals(StoredFieldsSpec.NO_REQUIREMENTS)) {
+            throw new IllegalStateException(
+                "found row stride readers [" + rowStrideReaders + "] without stored fields [" + storedFieldsSpec + "]"
+            );
+        }
+        StoredFieldLoader storedFieldLoader;
+        if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) {
+            storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec);
+            operator.trackStoredFields(storedFieldsSpec, true);
+        } else {
+            storedFieldLoader = StoredFieldLoader.fromSpec(storedFieldsSpec);
+            operator.trackStoredFields(storedFieldsSpec, false);
+        }
+        BlockLoaderStoredFieldsFromLeafLoader storedFields = new BlockLoaderStoredFieldsFromLeafLoader(
+            storedFieldLoader.getLoader(ctx, null),
+            sourceLoader != null ? sourceLoader.leaf(ctx.reader(), null) : null
+        );
+        int p = 0;
+        while (p < docs.count()) {
+            int doc = docs.get(p++);
+            storedFields.advanceTo(doc);
+            for (RowStrideReaderWork work : rowStrideReaders) {
+                work.read(doc, storedFields);
+            }
+        }
+        for (RowStrideReaderWork work : rowStrideReaders) {
+            target[work.offset] = work.build();
+            operator.sanityCheckBlock(work.reader, p, target[work.offset], work.offset);
+        }
+    }
+
+    /**
+     * Is it more efficient to use a sequential stored field reader
+     * when reading stored fields for the documents contained in {@code docIds}?
+     */
+    private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) {
+        int count = docs.count();
+        if (count < SEQUENTIAL_BOUNDARY) {
+            return false;
+        }
+        int range = docs.get(count - 1) - docs.get(0);
+        return range * storedFieldsSequentialProportion <= count;
+    }
+
+    private record RowStrideReaderWork(BlockLoader.RowStrideReader reader, Block.Builder builder, BlockLoader loader, int offset)
+        implements
+            Releasable {
+        void read(int doc, BlockLoaderStoredFieldsFromLeafLoader storedFields) throws IOException {
+            reader.read(doc, storedFields, builder);
+        }
+
+        Block build() {
+            return (Block) loader.convert(builder.build());
+        }
+
+        @Override
+        public void close() {
+            builder.close();
+        }
+    }
+}

+ 58 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesReader.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.DocVector;
+import org.elasticsearch.core.ReleasableIterator;
+import org.elasticsearch.core.Releasables;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+public abstract class ValuesReader implements ReleasableIterator<Block[]> {
+    protected final ValuesSourceReaderOperator operator;
+    protected final DocVector docs;
+    private int offset;
+
+    ValuesReader(ValuesSourceReaderOperator operator, DocVector docs) {
+        this.operator = operator;
+        this.docs = docs;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return offset < docs.getPositionCount();
+    }
+
+    @Override
+    public Block[] next() {
+        Block[] target = new Block[operator.fields.length];
+        boolean success = false;
+        try {
+            load(target, offset);
+            success = true;
+            for (Block b : target) {
+                operator.valuesLoaded += b.getTotalValueCount();
+            }
+            offset += target[0].getPositionCount();
+            return target;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        } finally {
+            if (success == false) {
+                Releasables.closeExpectNoException(target);
+            }
+        }
+    }
+
+    protected abstract void load(Block[] target, int offset) throws IOException;
+
+    @Override
+    public void close() {}
+}

+ 306 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperator.java

@@ -0,0 +1,306 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.LeafReaderContext;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.DocBlock;
+import org.elasticsearch.compute.data.DocVector;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.LuceneSourceOperator;
+import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.core.ReleasableIterator;
+import org.elasticsearch.index.mapper.BlockLoader;
+import org.elasticsearch.index.mapper.SourceLoader;
+import org.elasticsearch.search.fetch.StoredFieldsSpec;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+
+/**
+ * Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
+ * and outputs them to a new column.
+ */
+public class ValuesSourceReaderOperator extends AbstractPageMappingToIteratorOperator {
+    /**
+     * Creates a factory for {@link ValuesSourceReaderOperator}.
+     * @param fields fields to load
+     * @param shardContexts per-shard loading information
+     * @param docChannel the channel containing the shard, leaf/segment and doc id
+     */
+    public record Factory(List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) implements OperatorFactory {
+        public Factory {
+            if (fields.isEmpty()) {
+                throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields");
+            }
+        }
+
+        @Override
+        public Operator get(DriverContext driverContext) {
+            return new ValuesSourceReaderOperator(driverContext.blockFactory(), fields, shardContexts, docChannel);
+        }
+
+        @Override
+        public String describe() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("ValuesSourceReaderOperator[fields = [");
+            if (fields.size() < 10) {
+                boolean first = true;
+                for (FieldInfo f : fields) {
+                    if (first) {
+                        first = false;
+                    } else {
+                        sb.append(", ");
+                    }
+                    sb.append(f.name);
+                }
+            } else {
+                sb.append(fields.size()).append(" fields");
+            }
+            return sb.append("]]").toString();
+        }
+    }
+
+    /**
+     * Configuration for a field to load.
+     *
+     * {@code blockLoader} maps shard index to the {@link BlockLoader}s
+     * which load the actual blocks.
+     */
+    public record FieldInfo(String name, ElementType type, IntFunction<BlockLoader> blockLoader) {}
+
+    public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
+
+    final FieldWork[] fields;
+    final List<ShardContext> shardContexts;
+    private final int docChannel;
+    final BlockFactory blockFactory;
+
+    private final Map<String, Integer> readersBuilt = new TreeMap<>();
+    long valuesLoaded;
+
+    private int lastShard = -1;
+    private int lastSegment = -1;
+
+    /**
+     * Creates a new extractor
+     * @param fields fields to load
+     * @param docChannel the channel containing the shard, leaf/segment and doc id
+     */
+    public ValuesSourceReaderOperator(BlockFactory blockFactory, List<FieldInfo> fields, List<ShardContext> shardContexts, int docChannel) {
+        if (fields.isEmpty()) {
+            throw new IllegalStateException("ValuesSourceReaderOperator doesn't support empty fields");
+        }
+        this.fields = fields.stream().map(FieldWork::new).toArray(FieldWork[]::new);
+        this.shardContexts = shardContexts;
+        this.docChannel = docChannel;
+        this.blockFactory = blockFactory;
+    }
+
+    @Override
+    protected ReleasableIterator<Page> receive(Page page) {
+        DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
+        return appendBlockArrays(
+            page,
+            docVector.singleSegment() ? new ValuesFromSingleReader(this, docVector) : new ValuesFromManyReader(this, docVector)
+        );
+    }
+
+    void positionFieldWork(int shard, int segment, int firstDoc) {
+        if (lastShard == shard) {
+            if (lastSegment == segment) {
+                for (FieldWork w : fields) {
+                    w.sameSegment(firstDoc);
+                }
+                return;
+            }
+            lastSegment = segment;
+            for (FieldWork w : fields) {
+                w.sameShardNewSegment();
+            }
+            return;
+        }
+        lastShard = shard;
+        lastSegment = segment;
+        for (FieldWork w : fields) {
+            w.newShard(shard);
+        }
+    }
+
+    boolean positionFieldWorkDocGuaranteedAscending(int shard, int segment) {
+        if (lastShard == shard) {
+            if (lastSegment == segment) {
+                return false;
+            }
+            lastSegment = segment;
+            for (FieldWork w : fields) {
+                w.sameShardNewSegment();
+            }
+            return true;
+        }
+        lastShard = shard;
+        lastSegment = segment;
+        for (FieldWork w : fields) {
+            w.newShard(shard);
+        }
+        return true;
+    }
+
+    void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {
+        readersBuilt.merge(
+            "stored_fields["
+                + "requires_source:"
+                + spec.requiresSource()
+                + ", fields:"
+                + spec.requiredStoredFields().size()
+                + ", sequential: "
+                + sequential
+                + "]",
+            1,
+            (prev, one) -> prev + one
+        );
+    }
+
+    protected class FieldWork {
+        final FieldInfo info;
+
+        BlockLoader loader;
+        BlockLoader.ColumnAtATimeReader columnAtATime;
+        BlockLoader.RowStrideReader rowStride;
+
+        FieldWork(FieldInfo info) {
+            this.info = info;
+        }
+
+        void sameSegment(int firstDoc) {
+            if (columnAtATime != null && columnAtATime.canReuse(firstDoc) == false) {
+                columnAtATime = null;
+            }
+            if (rowStride != null && rowStride.canReuse(firstDoc) == false) {
+                rowStride = null;
+            }
+        }
+
+        void sameShardNewSegment() {
+            columnAtATime = null;
+            rowStride = null;
+        }
+
+        void newShard(int shard) {
+            loader = info.blockLoader.apply(shard);
+            columnAtATime = null;
+            rowStride = null;
+        }
+
+        BlockLoader.ColumnAtATimeReader columnAtATime(LeafReaderContext ctx) throws IOException {
+            if (columnAtATime == null) {
+                columnAtATime = loader.columnAtATimeReader(ctx);
+                trackReader("column_at_a_time", this.columnAtATime);
+            }
+            return columnAtATime;
+        }
+
+        BlockLoader.RowStrideReader rowStride(LeafReaderContext ctx) throws IOException {
+            if (rowStride == null) {
+                rowStride = loader.rowStrideReader(ctx);
+                trackReader("row_stride", this.rowStride);
+            }
+            return rowStride;
+        }
+
+        private void trackReader(String type, BlockLoader.Reader reader) {
+            readersBuilt.merge(info.name + ":" + type + ":" + reader, 1, (prev, one) -> prev + one);
+        }
+    }
+
+    LeafReaderContext ctx(int shard, int segment) {
+        return shardContexts.get(shard).reader().leaves().get(segment);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ValuesSourceReaderOperator[fields = [");
+        if (fields.length < 10) {
+            boolean first = true;
+            for (FieldWork f : fields) {
+                if (first) {
+                    first = false;
+                } else {
+                    sb.append(", ");
+                }
+                sb.append(f.info.name);
+            }
+        } else {
+            sb.append(fields.length).append(" fields");
+        }
+        return sb.append("]]").toString();
+    }
+
+    @Override
+    protected ValuesSourceReaderOperatorStatus status(
+        long processNanos,
+        int pagesReceived,
+        int pagesEmitted,
+        long rowsReceived,
+        long rowsEmitted
+    ) {
+        return new ValuesSourceReaderOperatorStatus(
+            new TreeMap<>(readersBuilt),
+            processNanos,
+            pagesReceived,
+            pagesEmitted,
+            rowsReceived,
+            rowsEmitted,
+            valuesLoaded
+        );
+    }
+
+    /**
+     * Quick checks for on the loaded block to make sure it looks reasonable.
+     * @param loader the object that did the loading - we use it to make error messages if the block is busted
+     * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has
+     * @param block the block to sanity check
+     * @param field offset into the {@link #fields} array for the block being loaded
+     */
+    void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) {
+        if (block.getPositionCount() != expectedPositions) {
+            throw new IllegalStateException(
+                sanityCheckBlockErrorPrefix(loader, block, field)
+                    + " has ["
+                    + block.getPositionCount()
+                    + "] positions instead of ["
+                    + expectedPositions
+                    + "]"
+            );
+        }
+        if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) {
+            throw new IllegalStateException(
+                sanityCheckBlockErrorPrefix(loader, block, field)
+                    + "'s element_type ["
+                    + block.elementType()
+                    + "] NOT IN (NULL, "
+                    + fields[field].info.type
+                    + ")"
+            );
+        }
+    }
+
+    private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) {
+        return fields[field].info.name + "[" + loader + "]: " + block;
+    }
+}

+ 159 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatus.java

@@ -0,0 +1,159 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.lucene.read;
+
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
+import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator;
+import org.elasticsearch.compute.operator.Operator;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
+import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
+import static org.elasticsearch.TransportVersions.ESQL_SPLIT_ON_BIG_VALUES;
+import static org.elasticsearch.TransportVersions.ESQL_SPLIT_ON_BIG_VALUES_8_19;
+import static org.elasticsearch.TransportVersions.ESQL_SPLIT_ON_BIG_VALUES_9_1;
+
+public class ValuesSourceReaderOperatorStatus extends AbstractPageMappingToIteratorOperator.Status {
+    public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+        Operator.Status.class,
+        "values_source_reader",
+        ValuesSourceReaderOperatorStatus::readFrom
+    );
+
+    private final Map<String, Integer> readersBuilt;
+    private final long valuesLoaded;
+
+    public ValuesSourceReaderOperatorStatus(
+        Map<String, Integer> readersBuilt,
+        long processNanos,
+        int pagesReceived,
+        int pagesEmitted,
+        long rowsReceived,
+        long rowsEmitted,
+        long valuesLoaded
+    ) {
+        super(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted);
+        this.readersBuilt = readersBuilt;
+        this.valuesLoaded = valuesLoaded;
+    }
+
+    static ValuesSourceReaderOperatorStatus readFrom(StreamInput in) throws IOException {
+        long processNanos;
+        int pagesReceived;
+        int pagesEmitted;
+        long rowsReceived;
+        long rowsEmitted;
+        if (supportsSplitOnBigValues(in.getTransportVersion())) {
+            AbstractPageMappingToIteratorOperator.Status status = new AbstractPageMappingToIteratorOperator.Status(in);
+            processNanos = status.processNanos();
+            pagesReceived = status.pagesReceived();
+            pagesEmitted = status.pagesEmitted();
+            rowsReceived = status.rowsReceived();
+            rowsEmitted = status.rowsEmitted();
+        } else {
+            AbstractPageMappingOperator.Status status = new AbstractPageMappingOperator.Status(in);
+            processNanos = status.processNanos();
+            pagesReceived = status.pagesProcessed();
+            pagesEmitted = status.pagesProcessed();
+            rowsReceived = status.rowsReceived();
+            rowsEmitted = status.rowsEmitted();
+        }
+        Map<String, Integer> readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt);
+        long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
+        return new ValuesSourceReaderOperatorStatus(
+            readersBuilt,
+            processNanos,
+            pagesReceived,
+            pagesEmitted,
+            rowsReceived,
+            rowsEmitted,
+            valuesLoaded
+        );
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        if (supportsSplitOnBigValues(out.getTransportVersion())) {
+            super.writeTo(out);
+        } else {
+            /*
+             * Before we knew how to split pages when reading large values
+             * our status just contained one int per page - just like AbstractPageMappingOperator.Status.
+             */
+            new AbstractPageMappingOperator.Status(processNanos(), pagesEmitted(), rowsReceived(), rowsEmitted()).writeTo(out);
+        }
+        out.writeMap(readersBuilt, StreamOutput::writeVInt);
+        if (supportsValuesLoaded(out.getTransportVersion())) {
+            out.writeVLong(valuesLoaded);
+        }
+    }
+
+    private static boolean supportsSplitOnBigValues(TransportVersion version) {
+        return version.onOrAfter(ESQL_SPLIT_ON_BIG_VALUES)
+            || version.isPatchFrom(ESQL_SPLIT_ON_BIG_VALUES_9_1)
+            || version.isPatchFrom(ESQL_SPLIT_ON_BIG_VALUES_8_19);
+    }
+
+    private static boolean supportsValuesLoaded(TransportVersion version) {
+        return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)
+            || version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19);
+    }
+
+    @Override
+    public String getWriteableName() {
+        return ENTRY.name;
+    }
+
+    public Map<String, Integer> readersBuilt() {
+        return readersBuilt;
+    }
+
+    @Override
+    public long valuesLoaded() {
+        return valuesLoaded;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.startObject("readers_built");
+        for (Map.Entry<String, Integer> e : readersBuilt.entrySet()) {
+            builder.field(e.getKey(), e.getValue());
+        }
+        builder.endObject();
+        builder.field("values_loaded", valuesLoaded);
+        innerToXContent(builder);
+        return builder.endObject();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (super.equals(o) == false) return false;
+        ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) o;
+        return readersBuilt.equals(status.readersBuilt) && valuesLoaded == status.valuesLoaded;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), readersBuilt, valuesLoaded);
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
+    }
+}

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java

@@ -127,7 +127,7 @@ public abstract class AbstractPageMappingOperator implements Operator {
             this.rowsEmitted = rowsEmitted;
         }
 
-        protected Status(StreamInput in) throws IOException {
+        public Status(StreamInput in) throws IOException {
             processNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
             pagesProcessed = in.readVInt();
             if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {

+ 49 - 13
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java

@@ -64,13 +64,38 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator
      */
     protected abstract ReleasableIterator<Page> receive(Page page);
 
+    /**
+     * Append an {@link Iterator} of arrays of {@link Block}s to a
+     * {@link Page}, one after the other. It's required that the
+     * iterator emit as many <strong>positions</strong> as there were
+     * in the page.
+     */
+    public static ReleasableIterator<Page> appendBlockArrays(Page page, ReleasableIterator<Block[]> toAdd) {
+        return new AppendBlocksIterator(page, toAdd);
+    }
+
     /**
      * Append an {@link Iterator} of {@link Block}s to a {@link Page}, one
      * after the other. It's required that the iterator emit as many
      * <strong>positions</strong> as there were in the page.
      */
     public static ReleasableIterator<Page> appendBlocks(Page page, ReleasableIterator<? extends Block> toAdd) {
-        return new AppendBlocksIterator(page, toAdd);
+        return appendBlockArrays(page, new ReleasableIterator<>() {
+            @Override
+            public boolean hasNext() {
+                return toAdd.hasNext();
+            }
+
+            @Override
+            public Block[] next() {
+                return new Block[] { toAdd.next() };
+            }
+
+            @Override
+            public void close() {
+                toAdd.close();
+            }
+        });
     }
 
     @Override
@@ -86,13 +111,24 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator
         if (next != null) {
             assert next.hasNext() == false : "has pending input page";
             next.close();
+            next = null;
         }
         if (page.getPositionCount() == 0) {
             return;
         }
-        next = new RuntimeTrackingIterator(receive(page));
-        pagesReceived++;
-        rowsReceived += page.getPositionCount();
+        try {
+            next = new RuntimeTrackingIterator(receive(page));
+            pagesReceived++;
+            rowsReceived += page.getPositionCount();
+        } finally {
+            if (next == null) {
+                /*
+                 * The `receive` operation failed, we need to release the incoming page
+                 * because it's no longer owned by anyone.
+                 */
+                page.releaseBlocks();
+            }
+        }
     }
 
     @Override
@@ -183,7 +219,7 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator
             this.rowsEmitted = rowsEmitted;
         }
 
-        protected Status(StreamInput in) throws IOException {
+        public Status(StreamInput in) throws IOException {
             processNanos = in.readVLong();
             pagesReceived = in.readVInt();
             pagesEmitted = in.readVInt();
@@ -284,11 +320,11 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator
 
     private static class AppendBlocksIterator implements ReleasableIterator<Page> {
         private final Page page;
-        private final ReleasableIterator<? extends Block> next;
+        private final ReleasableIterator<Block[]> next;
 
         private int positionOffset;
 
-        protected AppendBlocksIterator(Page page, ReleasableIterator<? extends Block> next) {
+        protected AppendBlocksIterator(Page page, ReleasableIterator<Block[]> next) {
             this.page = page;
             this.next = next;
         }
@@ -305,17 +341,17 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator
 
         @Override
         public final Page next() {
-            Block read = next.next();
+            Block[] read = next.next();
             int start = positionOffset;
-            positionOffset += read.getPositionCount();
-            if (start == 0 && read.getPositionCount() == page.getPositionCount()) {
+            positionOffset += read[0].getPositionCount();
+            if (start == 0 && read[0].getPositionCount() == page.getPositionCount()) {
                 for (int b = 0; b < page.getBlockCount(); b++) {
                     page.getBlock(b).incRef();
                 }
-                return page.appendBlock(read);
+                return page.appendBlocks(read);
             }
-            Block[] newBlocks = new Block[page.getBlockCount() + 1];
-            newBlocks[page.getBlockCount()] = read;
+            Block[] newBlocks = new Block[page.getBlockCount() + read.length];
+            System.arraycopy(read, 0, newBlocks, page.getBlockCount(), read.length);
             try {
                 // TODO a way to filter with a range please.
                 int[] positions = IntStream.range(start, positionOffset).toArray();

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ColumnLoadOperator.java

@@ -14,7 +14,7 @@ import org.elasticsearch.core.ReleasableIterator;
 
 /**
  * {@link Block#lookup Looks up} values from a provided {@link Block} and
- * mergeds them into each {@link Page}.
+ * merged them into each {@link Page}.
  */
 public class ColumnLoadOperator extends AbstractPageMappingToIteratorOperator {
     public record Values(String name, Block block) {

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java

@@ -32,7 +32,7 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;

+ 1 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -52,7 +52,7 @@ import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests;
 import org.elasticsearch.compute.lucene.ShardContext;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

@@ -32,6 +32,7 @@ import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.data.Vector;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;

+ 1 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java

@@ -21,6 +21,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter;
 import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.test.OperatorTestCase;
@@ -52,7 +53,7 @@ public class LuceneTopNSourceOperatorScoringTests extends LuceneTopNSourceOperat
     private IndexReader reader;
 
     @After
-    private void closeIndex() throws IOException {
+    public void closeScoringIndex() throws IOException {
         IOUtils.close(reader, directory);
     }
 

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.DoubleBlock;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.test.AnyOperatorTestCase;
@@ -57,7 +58,7 @@ public class LuceneTopNSourceOperatorTests extends AnyOperatorTestCase {
     private IndexReader reader;
 
     @After
-    private void closeIndex() throws IOException {
+    public void closeIndex() throws IOException {
         IOUtils.close(reader, directory);
     }
 

+ 3 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java

@@ -34,6 +34,9 @@ import org.elasticsearch.compute.data.DocVector;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.read.TimeSeriesExtractFieldOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.DriverStatus;

+ 13 - 4
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java → x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValueSourceReaderTypeConversionTests.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.compute.lucene;
+package org.elasticsearch.compute.lucene.read;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.DoubleDocValuesField;
@@ -48,6 +48,12 @@ import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.DataPartitioning;
+import org.elasticsearch.compute.lucene.LuceneOperator;
+import org.elasticsearch.compute.lucene.LuceneSliceQueue;
+import org.elasticsearch.compute.lucene.LuceneSourceOperator;
+import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests;
+import org.elasticsearch.compute.lucene.ShardContext;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.DriverRunner;
@@ -629,7 +635,9 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
             }
         }
         for (Operator op : operators) {
-            assertThat(((ValuesSourceReaderOperator) op).status().pagesProcessed(), equalTo(input.size()));
+            ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
+            assertThat(status.pagesReceived(), equalTo(input.size()));
+            assertThat(status.pagesEmitted(), equalTo(input.size()));
         }
         assertDriverContext(driverContext);
     }
@@ -716,8 +724,9 @@ public class ValueSourceReaderTypeConversionTests extends AnyOperatorTestCase {
         }
         drive(operators, input.iterator(), driverContext);
         for (int i = 0; i < cases.size(); i++) {
-            ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) operators.get(i).status();
-            assertThat(status.pagesProcessed(), equalTo(input.size()));
+            ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status();
+            assertThat(status.pagesReceived(), equalTo(input.size()));
+            assertThat(status.pagesEmitted(), equalTo(input.size()));
             FieldCase fc = cases.get(i);
             fc.checkReaders.check(fc.info.name(), allInOnePage, input.size(), totalSize, status.readersBuilt());
         }

+ 29 - 17
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java → x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorStatusTests.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.compute.lucene;
+package org.elasticsearch.compute.lucene.read;
 
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -18,9 +18,9 @@ import java.util.TreeMap;
 
 import static org.hamcrest.Matchers.equalTo;
 
-public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase<ValuesSourceReaderOperator.Status> {
-    public static ValuesSourceReaderOperator.Status simple() {
-        return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222, 1000);
+public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase<ValuesSourceReaderOperatorStatus> {
+    public static ValuesSourceReaderOperatorStatus simple() {
+        return new ValuesSourceReaderOperatorStatus(Map.of("ReaderType", 3), 1022323, 123, 200, 111, 222, 1000);
     }
 
     public static String simpleToJson() {
@@ -32,7 +32,8 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
               "values_loaded" : 1000,
               "process_nanos" : 1022323,
               "process_time" : "1ms",
-              "pages_processed" : 123,
+              "pages_received" : 123,
+              "pages_emitted" : 200,
               "rows_received" : 111,
               "rows_emitted" : 222
             }""";
@@ -43,16 +44,17 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
     }
 
     @Override
-    protected Writeable.Reader<ValuesSourceReaderOperator.Status> instanceReader() {
-        return ValuesSourceReaderOperator.Status::new;
+    protected Writeable.Reader<ValuesSourceReaderOperatorStatus> instanceReader() {
+        return ValuesSourceReaderOperatorStatus::readFrom;
     }
 
     @Override
-    public ValuesSourceReaderOperator.Status createTestInstance() {
-        return new ValuesSourceReaderOperator.Status(
+    public ValuesSourceReaderOperatorStatus createTestInstance() {
+        return new ValuesSourceReaderOperatorStatus(
             randomReadersBuilt(),
             randomNonNegativeLong(),
             randomNonNegativeInt(),
+            randomNonNegativeInt(),
             randomNonNegativeLong(),
             randomNonNegativeLong(),
             randomNonNegativeLong()
@@ -69,22 +71,32 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi
     }
 
     @Override
-    protected ValuesSourceReaderOperator.Status mutateInstance(ValuesSourceReaderOperator.Status instance) throws IOException {
+    protected ValuesSourceReaderOperatorStatus mutateInstance(ValuesSourceReaderOperatorStatus instance) throws IOException {
         Map<String, Integer> readersBuilt = instance.readersBuilt();
         long processNanos = instance.processNanos();
-        int pagesProcessed = instance.pagesProcessed();
+        int pagesReceived = instance.pagesReceived();
+        int pagesEmitted = instance.pagesEmitted();
         long rowsReceived = instance.rowsReceived();
         long rowsEmitted = instance.rowsEmitted();
         long valuesLoaded = instance.valuesLoaded();
-        switch (between(0, 5)) {
+        switch (between(0, 6)) {
             case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt);
             case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
-            case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
-            case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
-            case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
-            case 5 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong);
+            case 2 -> pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
+            case 3 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
+            case 4 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
+            case 5 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
+            case 6 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong);
             default -> throw new UnsupportedOperationException();
         }
-        return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded);
+        return new ValuesSourceReaderOperatorStatus(
+            readersBuilt,
+            processNanos,
+            pagesReceived,
+            pagesEmitted,
+            rowsReceived,
+            rowsEmitted,
+            valuesLoaded
+        );
     }
 }

+ 136 - 17
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java → x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/read/ValuesSourceReaderOperatorTests.java

@@ -5,7 +5,7 @@
  * 2.0.
  */
 
-package org.elasticsearch.compute.lucene;
+package org.elasticsearch.compute.lucene.read;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.DoubleDocValuesField;
@@ -45,6 +45,12 @@ import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.DataPartitioning;
+import org.elasticsearch.compute.lucene.LuceneOperator;
+import org.elasticsearch.compute.lucene.LuceneSliceQueue;
+import org.elasticsearch.compute.lucene.LuceneSourceOperator;
+import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests;
+import org.elasticsearch.compute.lucene.ShardContext;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
@@ -176,6 +182,10 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+        return sourceOperator(context, pageSize);
+    }
+
+    private SourceOperator sourceOperator(DriverContext context, int pageSize) {
         var luceneFactory = new LuceneSourceOperator.Factory(
             List.of(new LuceneSourceOperatorTests.MockShardContext(reader, 0)),
             ctx -> List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())),
@@ -205,6 +215,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             simpleField(b, "missing_text", "text");
             b.startObject("source_text").field("type", "text").field("store", false).endObject();
             b.startObject("mv_source_text").field("type", "text").field("store", false).endObject();
+            b.startObject("long_source_text").field("type", "text").field("store", false).endObject();
             b.startObject("stored_text").field("type", "text").field("store", true).endObject();
             b.startObject("mv_stored_text").field("type", "text").field("store", true).endObject();
 
@@ -380,6 +391,33 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         return DirectoryReader.open(directory);
     }
 
+    private IndexReader initIndexLongField(Directory directory, int size, int commitEvery) throws IOException {
+        try (
+            IndexWriter writer = new IndexWriter(
+                directory,
+                newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE).setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH)
+            )
+        ) {
+            for (int d = 0; d < size; d++) {
+                XContentBuilder source = JsonXContent.contentBuilder();
+                source.startObject();
+                source.field("long_source_text", Integer.toString(d).repeat(100 * 1024));
+                source.endObject();
+                ParsedDocument doc = mapperService.documentParser()
+                    .parseDocument(
+                        new SourceToParse("id" + d, BytesReference.bytes(source), XContentType.JSON),
+                        mapperService.mappingLookup()
+                    );
+                writer.addDocuments(doc.docs());
+
+                if (d % commitEvery == commitEvery - 1) {
+                    writer.commit();
+                }
+            }
+        }
+        return DirectoryReader.open(directory);
+    }
+
     @Override
     protected Matcher<String> expectedDescriptionOfSimple() {
         return equalTo("ValuesSourceReaderOperator[fields = [long]]");
@@ -491,16 +529,23 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         Page source = CannedSourceOperator.mergePages(
             CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), between(100, 5000)))
         );
-        List<Integer> shuffleList = new ArrayList<>();
-        IntStream.range(0, source.getPositionCount()).forEach(i -> shuffleList.add(i));
-        Randomness.shuffle(shuffleList);
-        int[] shuffleArray = shuffleList.stream().mapToInt(Integer::intValue).toArray();
-        Block[] shuffledBlocks = new Block[source.getBlockCount()];
-        for (int b = 0; b < shuffledBlocks.length; b++) {
-            shuffledBlocks[b] = source.getBlock(b).filter(shuffleArray);
-        }
-        source = new Page(shuffledBlocks);
-        loadSimpleAndAssert(driverContext, List.of(source), Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED);
+        loadSimpleAndAssert(driverContext, List.of(shuffle(source)), Block.MvOrdering.UNORDERED, Block.MvOrdering.UNORDERED);
+    }
+
+    private Page shuffle(Page source) {
+        try {
+            List<Integer> shuffleList = new ArrayList<>();
+            IntStream.range(0, source.getPositionCount()).forEach(i -> shuffleList.add(i));
+            Randomness.shuffle(shuffleList);
+            int[] shuffleArray = shuffleList.stream().mapToInt(Integer::intValue).toArray();
+            Block[] shuffledBlocks = new Block[source.getBlockCount()];
+            for (int b = 0; b < shuffledBlocks.length; b++) {
+                shuffledBlocks[b] = source.getBlock(b).filter(shuffleArray);
+            }
+            return new Page(shuffledBlocks);
+        } finally {
+            source.releaseBlocks();
+        }
     }
 
     private static ValuesSourceReaderOperator.FieldInfo fieldInfo(MappedFieldType ft, ElementType elementType) {
@@ -612,7 +657,9 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             }
         }
         for (Operator op : operators) {
-            assertThat(((ValuesSourceReaderOperator) op).status().pagesProcessed(), equalTo(input.size()));
+            ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
+            assertThat(status.pagesReceived(), equalTo(input.size()));
+            assertThat(status.pagesEmitted(), equalTo(input.size()));
         }
         assertDriverContext(driverContext);
     }
@@ -696,8 +743,9 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         }
         drive(operators, input.iterator(), driverContext);
         for (int i = 0; i < cases.size(); i++) {
-            ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) operators.get(i).status();
-            assertThat(status.pagesProcessed(), equalTo(input.size()));
+            ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status();
+            assertThat(status.pagesReceived(), equalTo(input.size()));
+            assertThat(status.pagesEmitted(), equalTo(input.size()));
             FieldCase fc = cases.get(i);
             fc.checkReaders.check(fc.info.name(), allInOnePage, input.size(), reader.leaves().size(), status.readersBuilt());
         }
@@ -863,6 +911,73 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
         return r;
     }
 
+    public void testLoadLong() throws IOException {
+        testLoadLong(false, false);
+    }
+
+    public void testLoadLongManySegments() throws IOException {
+        testLoadLong(false, true);
+    }
+
+    public void testLoadLongShuffled() throws IOException {
+        testLoadLong(true, false);
+    }
+
+    public void testLoadLongShuffledManySegments() throws IOException {
+        testLoadLong(true, true);
+    }
+
+    private void testLoadLong(boolean shuffle, boolean manySegments) throws IOException {
+        int numDocs = between(10, 500);
+        initMapping();
+        keyToTags.clear();
+        reader = initIndexLongField(directory, numDocs, manySegments ? commitEvery(numDocs) : numDocs);
+
+        DriverContext driverContext = driverContext();
+        List<Page> input = CannedSourceOperator.collectPages(sourceOperator(driverContext, numDocs));
+        assertThat(reader.leaves(), hasSize(manySegments ? greaterThan(5) : equalTo(1)));
+        assertThat(input, hasSize(reader.leaves().size()));
+        if (manySegments) {
+            input = List.of(CannedSourceOperator.mergePages(input));
+        }
+        if (shuffle) {
+            input = input.stream().map(this::shuffle).toList();
+        }
+
+        Checks checks = new Checks(Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING, Block.MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING);
+
+        List<FieldCase> cases = List.of(
+            new FieldCase(
+                mapperService.fieldType("long_source_text"),
+                ElementType.BYTES_REF,
+                checks::strings,
+                StatusChecks::longTextFromSource
+            )
+        );
+        // Build one operator for each field, so we get a unique map to assert on
+        List<Operator> operators = cases.stream()
+            .map(
+                i -> new ValuesSourceReaderOperator.Factory(
+                    List.of(i.info),
+                    List.of(
+                        new ValuesSourceReaderOperator.ShardContext(
+                            reader,
+                            () -> SourceLoader.FROM_STORED_SOURCE,
+                            STORED_FIELDS_SEQUENTIAL_PROPORTIONS
+                        )
+                    ),
+                    0
+                ).get(driverContext)
+            )
+            .toList();
+        drive(operators, input.iterator(), driverContext);
+        for (int i = 0; i < cases.size(); i++) {
+            ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) operators.get(i).status();
+            assertThat(status.pagesReceived(), equalTo(input.size()));
+            assertThat(status.pagesEmitted(), equalTo(input.size()));
+        }
+    }
+
     record Checks(Block.MvOrdering booleanAndNumericalDocValuesMvOrdering, Block.MvOrdering bytesRefDocValuesMvOrdering) {
         void longs(Block block, int position, int key) {
             LongVector longs = ((LongBlock) block).asVector();
@@ -1076,6 +1191,10 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             source("source_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers);
         }
 
+        static void longTextFromSource(boolean forcedRowByRow, int pageCount, int segmentCount, Map<?, ?> readers) {
+            source("long_source_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers);
+        }
+
         static void textFromStored(boolean forcedRowByRow, int pageCount, int segmentCount, Map<?, ?> readers) {
             stored("stored_text", "Bytes", forcedRowByRow, pageCount, segmentCount, readers);
         }
@@ -1482,13 +1601,13 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
     }
 
     public void testSequentialStoredFieldsTooSmall() throws IOException {
-        testSequentialStoredFields(false, between(1, ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY - 1));
+        testSequentialStoredFields(false, between(1, ValuesFromSingleReader.SEQUENTIAL_BOUNDARY - 1));
     }
 
     public void testSequentialStoredFieldsBigEnough() throws IOException {
         testSequentialStoredFields(
             true,
-            between(ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY, ValuesSourceReaderOperator.SEQUENTIAL_BOUNDARY * 2)
+            between(ValuesFromSingleReader.SEQUENTIAL_BOUNDARY, ValuesFromSingleReader.SEQUENTIAL_BOUNDARY * 2)
         );
     }
 
@@ -1519,7 +1638,7 @@ public class ValuesSourceReaderOperatorTests extends OperatorTestCase {
             int key = keys.getInt(p);
             checks.strings(results.get(0).getBlock(2), p, key);
         }
-        ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
+        ValuesSourceReaderOperatorStatus status = (ValuesSourceReaderOperatorStatus) op.status();
         assertMap(
             status.readersBuilt(),
             matchesMap().entry("key:column_at_a_time:BlockDocValuesReader.SingletonInts", 1)

+ 3 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java

@@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorStatusTests;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
@@ -166,7 +166,7 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase<DriverPr
     @Override
     protected NamedWriteableRegistry getNamedWriteableRegistry() {
         return new NamedWriteableRegistry(
-            List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperator.Status.ENTRY, ExchangeSinkOperator.Status.ENTRY)
+            List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperatorStatus.ENTRY, ExchangeSinkOperator.Status.ENTRY)
         );
     }
 }

+ 3 - 3
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java

@@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneSourceOperatorStatusTests;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperatorStatusTests;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatusTests;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperatorStatusTests;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
@@ -202,7 +202,7 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase<DriverSta
     @Override
     protected NamedWriteableRegistry getNamedWriteableRegistry() {
         return new NamedWriteableRegistry(
-            List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperator.Status.ENTRY, ExchangeSinkOperator.Status.ENTRY)
+            List.of(LuceneSourceOperator.Status.ENTRY, ValuesSourceReaderOperatorStatus.ENTRY, ExchangeSinkOperator.Status.ENTRY)
         );
     }
 }

+ 5 - 4
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -943,7 +943,9 @@ public class RestEsqlIT extends RestEsqlTestCase {
                 .entry("process_nanos", greaterThan(0))
                 .entry("processed_queries", List.of("*:*"))
                 .entry("partitioning_strategies", matchesMap().entry("rest-esql-test:0", "SHARD"));
-            case "ValuesSourceReaderOperator" -> basicProfile().entry("values_loaded", greaterThanOrEqualTo(0))
+            case "ValuesSourceReaderOperator" -> basicProfile().entry("pages_received", greaterThan(0))
+                .entry("pages_emitted", greaterThan(0))
+                .entry("values_loaded", greaterThanOrEqualTo(0))
                 .entry("readers_built", matchesMap().extraOk());
             case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0))
                 .entry("rows_received", greaterThan(0))
@@ -954,7 +956,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
             case "ExchangeSourceOperator" -> matchesMap().entry("pages_waiting", 0)
                 .entry("pages_emitted", greaterThan(0))
                 .entry("rows_emitted", greaterThan(0));
-            case "ProjectOperator", "EvalOperator" -> basicProfile();
+            case "ProjectOperator", "EvalOperator" -> basicProfile().entry("pages_processed", greaterThan(0));
             case "LimitOperator" -> matchesMap().entry("pages_processed", greaterThan(0))
                 .entry("limit", 1000)
                 .entry("limit_remaining", 999)
@@ -990,8 +992,7 @@ public class RestEsqlIT extends RestEsqlTestCase {
     }
 
     private MapMatcher basicProfile() {
-        return matchesMap().entry("pages_processed", greaterThan(0))
-            .entry("process_nanos", greaterThan(0))
+        return matchesMap().entry("process_nanos", greaterThan(0))
             .entry("rows_received", greaterThan(0))
             .entry("rows_emitted", greaterThan(0));
     }

+ 4 - 3
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

@@ -19,7 +19,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
 import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
 import org.elasticsearch.compute.operator.OperatorStatus;
@@ -129,12 +129,13 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
                     }
                     if (o.operator().equals("ValuesSourceReaderOperator[fields = [pause_me]]")) {
                         assertThat(description, equalTo("data"));
-                        ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status();
+                        ValuesSourceReaderOperatorStatus oStatus = (ValuesSourceReaderOperatorStatus) o.status();
                         assertMap(
                             oStatus.readersBuilt(),
                             matchesMap().entry("pause_me:column_at_a_time:ScriptLongs", greaterThanOrEqualTo(1))
                         );
-                        assertThat(oStatus.pagesProcessed(), greaterThanOrEqualTo(1));
+                        assertThat(oStatus.pagesReceived(), greaterThanOrEqualTo(1));
+                        assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(1));
                         assertThat(oStatus.valuesLoaded(), greaterThanOrEqualTo(1L));
                         valuesSourceReaders++;
                         continue;

+ 1 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java

@@ -28,7 +28,7 @@ import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.ShardContext;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.DriverRunner;

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.Build;
 import org.elasticsearch.common.util.FeatureFlag;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.rest.action.admin.cluster.RestNodesCapabilitiesAction;
 import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin;
@@ -1016,7 +1017,7 @@ public class EsqlCapabilities {
         FILTER_IN_CONVERTED_NULL,
 
         /**
-         * When creating constant null blocks in {@link org.elasticsearch.compute.lucene.ValuesSourceReaderOperator}, we also handed off
+         * When creating constant null blocks in {@link ValuesSourceReaderOperator}, we also handed off
          * the ownership of that block - but didn't account for the fact that the caller might close it, leading to double releases
          * in some union type queries. C.f. https://github.com/elastic/elasticsearch/issues/125850
          */

+ 10 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java

@@ -35,7 +35,7 @@ import org.elasticsearch.compute.data.LocalCircuitBreaker;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.Operator;
@@ -350,8 +350,14 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
                 warnings
             );
             releasables.add(queryOperator);
-            var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
-            releasables.add(extractFieldsOperator);
+
+            List<Operator> operators = new ArrayList<>();
+            if (request.extractFields.isEmpty() == false) {
+                var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
+                releasables.add(extractFieldsOperator);
+                operators.add(extractFieldsOperator);
+            }
+            operators.add(finishPages);
 
             /*
              * Collect all result Pages in a synchronizedList mostly out of paranoia. We'll
@@ -373,7 +379,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
                 driverContext,
                 request::toString,
                 queryOperator,
-                List.of(extractFieldsOperator, finishPages),
+                operators,
                 outputOperator,
                 Driver.DEFAULT_STATUS_INTERVAL,
                 Releasables.wrap(shardContext.release, localBreaker)

+ 2 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -28,9 +28,9 @@ import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.LuceneSliceQueue;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
-import org.elasticsearch.compute.lucene.TimeSeriesExtractFieldOperator;
 import org.elasticsearch.compute.lucene.TimeSeriesSourceOperatorFactory;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.TimeSeriesExtractFieldOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OrdinalsGroupingOperator;
 import org.elasticsearch.compute.operator.SourceOperator;

+ 2 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -24,7 +24,7 @@ import org.elasticsearch.compute.data.BlockFactoryProvider;
 import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorStatus;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator;
 import org.elasticsearch.compute.operator.AggregationOperator;
@@ -320,7 +320,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin
         entries.add(TimeSeriesSourceOperator.Status.ENTRY);
         entries.add(TopNOperatorStatus.ENTRY);
         entries.add(MvExpandOperator.Status.ENTRY);
-        entries.add(ValuesSourceReaderOperator.Status.ENTRY);
+        entries.add(ValuesSourceReaderOperatorStatus.ENTRY);
         entries.add(SingleValueQuery.ENTRY);
         entries.add(AsyncOperator.Status.ENTRY);
         entries.add(EnrichLookupOperator.Status.ENTRY);

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

@@ -23,7 +23,7 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.compute.lucene.DataPartitioning;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.LuceneTopNSourceOperator;
-import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
+import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.SourceOperator;
 import org.elasticsearch.compute.test.NoOpReleasable;
 import org.elasticsearch.compute.test.TestBlockFactory;