瀏覽代碼

ESQL: Add a `HashLookupOperator` (#107894)

Adds and operator takes a `Block` on construction, builds a `BlockHash`,
and uses it to resolve keys to row offsets. Example time!

Say we hand construct the operator with the first two columns of this
`Page`:
| a |  b |  v |
| -:| --:| --:|
| 1 | 11 | 21 |
| 2 | 12 | 22 |
| 2 | 14 | 23 |
| 2 | 11 | 24 |

If we then fire this the first two columns of this `Page` into it, we'll
get the third column:
|     a |  b |   ord |
| -----:| --:| -----:|
|     2 | 14 |     2 |
|     1 | 11 |     0 |
|     3 | 11 |  null |
| [1,2] | 11 | [0,3] |

This is the first half of the of the `Operator` side of a hash join.
The second half is looking up values from those row offsets. That'd
mean adding the `v` column like so:
|     a |  b |   ord |       v |
| -----:| --:| -----:| -------:|
|     2 | 14 |     2 |      23 |
|     1 | 11 |     0 |      21 |
|     3 | 11 |  null |    null |
| [1,2] | 11 | [0,3] | [21,24] |

And *that* is comparatively simple.

Notice that I said this is the *Operator* side of a hash join. There's
no planning or distributed execution involved. Yet. And a hash join is
something you'd distribute. This `Operator` can run on a data node or a
coordinating node. It doesn't care. It just needs an input.
Nik Everett 1 年之前
父節點
當前提交
1507c8767e
共有 14 個文件被更改,包括 923 次插入72 次删除
  1. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  2. 7 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java
  3. 38 40
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java
  4. 32 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java
  5. 291 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java
  6. 138 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashLookupOperator.java
  7. 1 25
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java
  8. 70 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java
  9. 60 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperatorStatusTests.java
  10. 48 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashLookupOperatorTests.java
  11. 116 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/IteratorAppendPageTests.java
  12. 118 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/IteratorRemovePageTests.java
  13. 1 7
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java
  14. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -183,6 +183,7 @@ public class TransportVersions {
     public static final TransportVersion INDEX_SEGMENTS_VECTOR_FORMATS = def(8_642_00_0);
     public static final TransportVersion ADD_RESOURCE_ALREADY_UPLOADED_EXCEPTION = def(8_643_00_0);
     public static final TransportVersion ESQL_MV_ORDERING_SORTED_ASCENDING = def(8_644_00_0);
+    public static final TransportVersion ESQL_PAGE_MAPPING_TO_ITERATOR = def(8_645_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 7 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java

@@ -111,6 +111,13 @@ public abstract sealed class BlockHash implements Releasable, SeenGroupIds //
         return new PackedValuesBlockHash(groups, blockFactory, emitBatchSize);
     }
 
+    /**
+     * Temporary method to build a {@link PackedValuesBlockHash}.
+     */
+    public static BlockHash buildPackedValuesBlockHash(List<GroupSpec> groups, BlockFactory blockFactory, int emitBatchSize) {
+        return new PackedValuesBlockHash(groups, blockFactory, emitBatchSize);
+    }
+
     /**
      * Creates a specialized hash table that maps a {@link Block} of the given input element type to ids.
      */

+ 38 - 40
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java

@@ -9,7 +9,6 @@ package org.elasticsearch.compute.aggregation.blockhash;
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefBuilder;
-import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.BitArray;
@@ -24,6 +23,7 @@ import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.mvdedupe.BatchEncoder;
 import org.elasticsearch.compute.operator.mvdedupe.MultivalueDedupe;
+import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.ReleasableIterator;
 import org.elasticsearch.core.Releasables;
 
@@ -65,14 +65,14 @@ final class PackedValuesBlockHash extends BlockHash {
     private final BytesRefHash bytesRefHash;
     private final int nullTrackingBytes;
     private final BytesRefBuilder bytes = new BytesRefBuilder();
-    private final Group[] groups;
+    private final List<GroupSpec> specs;
 
     PackedValuesBlockHash(List<GroupSpec> specs, BlockFactory blockFactory, int emitBatchSize) {
         super(blockFactory);
-        this.groups = specs.stream().map(Group::new).toArray(Group[]::new);
+        this.specs = specs;
         this.emitBatchSize = emitBatchSize;
         this.bytesRefHash = new BytesRefHash(1, blockFactory.bigArrays());
-        this.nullTrackingBytes = (groups.length + 7) / 8;
+        this.nullTrackingBytes = (specs.size() + 7) / 8;
         bytes.grow(nullTrackingBytes);
     }
 
@@ -90,9 +90,9 @@ final class PackedValuesBlockHash extends BlockHash {
     /**
      * The on-heap representation of a {@code for} loop for each group key.
      */
-    private static class Group {
+    private static class Group implements Releasable {
         final GroupSpec spec;
-        BatchEncoder encoder;
+        final BatchEncoder encoder;
         int positionOffset;
         int valueOffset;
         /**
@@ -107,18 +107,25 @@ final class PackedValuesBlockHash extends BlockHash {
         int valueCount;
         int bytesStart;
 
-        Group(GroupSpec spec) {
+        Group(GroupSpec spec, Page page, int batchSize) {
             this.spec = spec;
+            this.encoder = MultivalueDedupe.batchEncoder(page.getBlock(spec.channel()), batchSize, true);
+        }
+
+        @Override
+        public void close() {
+            encoder.close();
         }
     }
 
     class AddWork extends AbstractAddBlock {
+        final Group[] groups;
         final int positionCount;
         int position;
 
         AddWork(Page page, GroupingAggregatorFunction.AddInput addInput, int batchSize) {
             super(blockFactory, emitBatchSize, addInput);
-            initializeGroupsForPage(page, batchSize);
+            this.groups = specs.stream().map(s -> new Group(s, page, batchSize)).toArray(Group[]::new);
             this.positionCount = page.getPositionCount();
         }
 
@@ -129,7 +136,7 @@ final class PackedValuesBlockHash extends BlockHash {
          */
         void add() {
             for (position = 0; position < positionCount; position++) {
-                boolean singleEntry = startPosition();
+                boolean singleEntry = startPosition(groups);
                 if (singleEntry) {
                     addSingleEntry();
                 } else {
@@ -140,7 +147,7 @@ final class PackedValuesBlockHash extends BlockHash {
         }
 
         private void addSingleEntry() {
-            fillBytesSv();
+            fillBytesSv(groups);
             ords.appendInt(Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
             addedValue(position);
         }
@@ -149,13 +156,13 @@ final class PackedValuesBlockHash extends BlockHash {
             ords.beginPositionEntry();
             int g = 0;
             do {
-                fillBytesMv(g);
+                fillBytesMv(groups, g);
 
                 // emit ords
                 ords.appendInt(Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
                 addedValueInMultivaluePosition(position);
 
-                g = rewindKeys();
+                g = rewindKeys(groups);
             } while (g >= 0);
             ords.endPositionEntry();
             for (Group group : groups) {
@@ -165,10 +172,7 @@ final class PackedValuesBlockHash extends BlockHash {
 
         @Override
         public void close() {
-            Releasables.closeExpectNoException(
-                super::close,
-                Releasables.wrap(() -> Iterators.map(Iterators.forArray(groups), g -> g.encoder))
-            );
+            Releasables.closeExpectNoException(super::close, Releasables.wrap(groups));
         }
     }
 
@@ -178,14 +182,15 @@ final class PackedValuesBlockHash extends BlockHash {
     }
 
     class LookupWork implements ReleasableIterator<IntBlock> {
+        private final Group[] groups;
         private final long targetBytesSize;
         private final int positionCount;
         private int position;
 
         LookupWork(Page page, long targetBytesSize, int batchSize) {
+            this.groups = specs.stream().map(s -> new Group(s, page, batchSize)).toArray(Group[]::new);
             this.positionCount = page.getPositionCount();
             this.targetBytesSize = targetBytesSize;
-            initializeGroupsForPage(page, batchSize);
         }
 
         @Override
@@ -198,7 +203,7 @@ final class PackedValuesBlockHash extends BlockHash {
             int size = Math.toIntExact(Math.min(Integer.MAX_VALUE, targetBytesSize / Integer.BYTES / 2));
             try (IntBlock.Builder ords = blockFactory.newIntBlockBuilder(size)) {
                 while (position < positionCount && ords.estimatedBytes() < targetBytesSize) {
-                    boolean singleEntry = startPosition();
+                    boolean singleEntry = startPosition(groups);
                     if (singleEntry) {
                         lookupSingleEntry(ords);
                     } else {
@@ -211,7 +216,7 @@ final class PackedValuesBlockHash extends BlockHash {
         }
 
         private void lookupSingleEntry(IntBlock.Builder ords) {
-            fillBytesSv();
+            fillBytesSv(groups);
             long found = bytesRefHash.find(bytes.get());
             if (found < 0) {
                 ords.appendNull();
@@ -226,7 +231,7 @@ final class PackedValuesBlockHash extends BlockHash {
             int g = 0;
             int count = 0;
             do {
-                fillBytesMv(g);
+                fillBytesMv(groups, g);
 
                 // emit ords
                 long found = bytesRefHash.find(bytes.get());
@@ -248,7 +253,7 @@ final class PackedValuesBlockHash extends BlockHash {
                         }
                     }
                 }
-                g = rewindKeys();
+                g = rewindKeys(groups);
             } while (g >= 0);
             if (firstFound < 0) {
                 ords.appendNull();
@@ -265,24 +270,17 @@ final class PackedValuesBlockHash extends BlockHash {
 
         @Override
         public void close() {
-            Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(Iterators.forArray(groups), g -> g.encoder)));
-        }
-    }
-
-    private void initializeGroupsForPage(Page page, int batchSize) {
-        for (Group group : groups) {
-            Block b = page.getBlock(group.spec.channel());
-            group.encoder = MultivalueDedupe.batchEncoder(b, batchSize, true);
+            Releasables.closeExpectNoException(groups);
         }
     }
 
     /**
-     * Correctly position all {@link #groups}, clear the {@link #bytes},
+     * Correctly position all {@code groups}, clear the {@link #bytes},
      * and position it past the null tracking bytes. Call this before
      * encoding a new position.
      * @return true if this position has only a single ordinal
      */
-    private boolean startPosition() {
+    private boolean startPosition(Group[] groups) {
         boolean singleEntry = true;
         for (Group g : groups) {
             /*
@@ -304,7 +302,7 @@ final class PackedValuesBlockHash extends BlockHash {
         return singleEntry;
     }
 
-    private void fillBytesSv() {
+    private void fillBytesSv(Group[] groups) {
         for (int g = 0; g < groups.length; g++) {
             Group group = groups[g];
             assert group.writtenValues == 0;
@@ -317,7 +315,7 @@ final class PackedValuesBlockHash extends BlockHash {
         }
     }
 
-    private void fillBytesMv(int startingGroup) {
+    private void fillBytesMv(Group[] groups, int startingGroup) {
         for (int g = startingGroup; g < groups.length; g++) {
             Group group = groups[g];
             group.bytesStart = bytes.length();
@@ -331,7 +329,7 @@ final class PackedValuesBlockHash extends BlockHash {
         }
     }
 
-    private int rewindKeys() {
+    private int rewindKeys(Group[] groups) {
         int g = groups.length - 1;
         Group group = groups[g];
         bytes.setLength(group.bytesStart);
@@ -350,11 +348,11 @@ final class PackedValuesBlockHash extends BlockHash {
     @Override
     public Block[] getKeys() {
         int size = Math.toIntExact(bytesRefHash.size());
-        BatchEncoder.Decoder[] decoders = new BatchEncoder.Decoder[groups.length];
-        Block.Builder[] builders = new Block.Builder[groups.length];
+        BatchEncoder.Decoder[] decoders = new BatchEncoder.Decoder[specs.size()];
+        Block.Builder[] builders = new Block.Builder[specs.size()];
         try {
             for (int g = 0; g < builders.length; g++) {
-                ElementType elementType = groups[g].spec.elementType();
+                ElementType elementType = specs.get(g).elementType();
                 decoders[g] = BatchEncoder.decoder(elementType);
                 builders[g] = elementType.newBlockBuilder(size, blockFactory);
             }
@@ -424,12 +422,12 @@ final class PackedValuesBlockHash extends BlockHash {
         StringBuilder b = new StringBuilder();
         b.append("PackedValuesBlockHash{groups=[");
         boolean first = true;
-        for (int i = 0; i < groups.length; i++) {
+        for (int i = 0; i < specs.size(); i++) {
             if (i > 0) {
                 b.append(", ");
             }
-            Group group = groups[i];
-            b.append(group.spec.channel()).append(':').append(group.spec.elementType());
+            GroupSpec spec = specs.get(i);
+            b.append(spec.channel()).append(':').append(spec.elementType());
         }
         b.append("], entries=").append(bytesRefHash.size());
         b.append(", size=").append(ByteSizeValue.ofBytes(bytesRefHash.ramBytesUsed()));

+ 32 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Page.java

@@ -259,4 +259,36 @@ public final class Page implements Writeable {
         }
         return new Page(blocks);
     }
+
+    /**
+     * Returns a new page with blocks in the containing {@link Block}s
+     * shifted around or removed. The new {@link Page} will have as
+     * many blocks as the {@code length} of the provided array. Those
+     * blocks will be set to the block at the position of the
+     * <strong>value</strong> of each entry in the parameter.
+     */
+    public Page projectBlocks(int[] blockMapping) {
+        if (blocksReleased) {
+            throw new IllegalStateException("can't read released page");
+        }
+        Block[] mapped = new Block[blockMapping.length];
+        try {
+            for (int b = 0; b < blockMapping.length; b++) {
+                if (blockMapping[b] >= blocks.length) {
+                    throw new IllegalArgumentException(
+                        "Cannot project block with index [" + blockMapping[b] + "] from a page with size [" + blocks.length + "]"
+                    );
+                }
+                mapped[b] = blocks[blockMapping[b]];
+                mapped[b].incRef();
+            }
+            Page result = new Page(false, getPositionCount(), mapped);
+            mapped = null;
+            return result;
+        } finally {
+            if (mapped != null) {
+                Releasables.close(mapped);
+            }
+        }
+    }
 }

+ 291 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java

@@ -0,0 +1,291 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.ReleasableIterator;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Maps a single {@link Page} into zero or more resulting pages.
+ */
+public abstract class AbstractPageMappingToIteratorOperator implements Operator {
+    private ReleasableIterator<Page> next;
+
+    private boolean finished = false;
+
+    /**
+     * Number of milliseconds this operation has spent receiving pages.
+     */
+    private long processNanos;
+
+    /**
+     * Count of pages that have been received by this operator.
+     */
+    private int pagesReceived;
+
+    /**
+     * Count of pages that have been emitted by this operator.
+     */
+    private int pagesEmitted;
+
+    /**
+     * Build and Iterator of results for a new page.
+     */
+    protected abstract ReleasableIterator<Page> receive(Page page);
+
+    /**
+     * Append an {@link Iterator} of {@link Block}s to a {@link Page}, one
+     * after the other. It's required that the iterator emit as many
+     * <strong>positions</strong> as there were in the page.
+     */
+    public static ReleasableIterator<Page> appendBlocks(Page page, ReleasableIterator<? extends Block> toAdd) {
+        return new AppendBlocksIterator(page, toAdd);
+    }
+
+    @Override
+    public abstract String toString();
+
+    @Override
+    public final boolean needsInput() {
+        return finished == false && (next == null || next.hasNext() == false);
+    }
+
+    @Override
+    public final void addInput(Page page) {
+        if (next != null) {
+            assert next.hasNext() == false : "has pending input page";
+            next.close();
+        }
+        if (page.getPositionCount() == 0) {
+            return;
+        }
+        next = new RuntimeTrackingIterator(receive(page));
+        pagesReceived++;
+    }
+
+    @Override
+    public final void finish() {
+        finished = true;
+    }
+
+    @Override
+    public final boolean isFinished() {
+        return finished && (next == null || next.hasNext() == false);
+    }
+
+    @Override
+    public final Page getOutput() {
+        if (next == null || next.hasNext() == false) {
+            return null;
+        }
+        Page ret = next.next();
+        pagesEmitted++;
+        return ret;
+    }
+
+    @Override
+    public final AbstractPageMappingToIteratorOperator.Status status() {
+        return status(processNanos, pagesReceived, pagesEmitted);
+    }
+
+    protected AbstractPageMappingToIteratorOperator.Status status(long processNanos, int pagesReceived, int pagesEmitted) {
+        return new AbstractPageMappingToIteratorOperator.Status(processNanos, pagesReceived, pagesEmitted);
+    }
+
+    @Override
+    public void close() {
+        Releasables.closeExpectNoException(next);
+    }
+
+    private class RuntimeTrackingIterator implements ReleasableIterator<Page> {
+        private final ReleasableIterator<Page> next;
+
+        private RuntimeTrackingIterator(ReleasableIterator<Page> next) {
+            this.next = next;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return next.hasNext();
+        }
+
+        @Override
+        public Page next() {
+            long start = System.nanoTime();
+            Page out = next.next();
+            processNanos += System.nanoTime() - start;
+            return out;
+        }
+
+        @Override
+        public void close() {
+            next.close();
+        }
+    }
+
+    public static class Status implements Operator.Status {
+        public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+            Operator.Status.class,
+            "page_mapping_to_iterator",
+            AbstractPageMappingOperator.Status::new
+        );
+
+        private final long processNanos;
+        private final int pagesReceived;
+        private final int pagesEmitted;
+
+        public Status(long processNanos, int pagesProcessed, int pagesEmitted) {
+            this.processNanos = processNanos;
+            this.pagesReceived = pagesProcessed;
+            this.pagesEmitted = pagesEmitted;
+        }
+
+        protected Status(StreamInput in) throws IOException {
+            processNanos = in.readVLong();
+            pagesReceived = in.readVInt();
+            pagesEmitted = in.readVInt();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeVLong(processNanos);
+            out.writeVInt(pagesReceived);
+            out.writeVInt(pagesEmitted);
+        }
+
+        @Override
+        public String getWriteableName() {
+            return ENTRY.name;
+        }
+
+        public int pagesReceived() {
+            return pagesReceived;
+        }
+
+        public int pagesEmitted() {
+            return pagesEmitted;
+        }
+
+        public long processNanos() {
+            return processNanos;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            innerToXContent(builder);
+            return builder.endObject();
+        }
+
+        /**
+         * Render the body of the object for this status. Protected so subclasses
+         * can call it to render the "default" body.
+         */
+        protected final XContentBuilder innerToXContent(XContentBuilder builder) throws IOException {
+            builder.field("process_nanos", processNanos);
+            if (builder.humanReadable()) {
+                builder.field("process_time", TimeValue.timeValueNanos(processNanos));
+            }
+            builder.field("pages_received", pagesReceived);
+            return builder.field("pages_emitted", pagesEmitted);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            AbstractPageMappingToIteratorOperator.Status status = (AbstractPageMappingToIteratorOperator.Status) o;
+            return processNanos == status.processNanos && pagesReceived == status.pagesReceived && pagesEmitted == status.pagesEmitted;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(processNanos, pagesReceived, pagesEmitted);
+        }
+
+        @Override
+        public String toString() {
+            return Strings.toString(this);
+        }
+
+        @Override
+        public TransportVersion getMinimalSupportedVersion() {
+            return TransportVersions.ESQL_PAGE_MAPPING_TO_ITERATOR;
+        }
+    }
+
+    private static class AppendBlocksIterator implements ReleasableIterator<Page> {
+        private final Page page;
+        private final ReleasableIterator<? extends Block> next;
+
+        private int positionOffset;
+
+        protected AppendBlocksIterator(Page page, ReleasableIterator<? extends Block> next) {
+            this.page = page;
+            this.next = next;
+        }
+
+        @Override
+        public final boolean hasNext() {
+            if (next.hasNext()) {
+                assert positionOffset < page.getPositionCount();
+                return true;
+            }
+            assert positionOffset == page.getPositionCount();
+            return false;
+        }
+
+        @Override
+        public final Page next() {
+            Block read = next.next();
+            int start = positionOffset;
+            positionOffset += read.getPositionCount();
+            if (start == 0 && read.getPositionCount() == page.getPositionCount()) {
+                for (int b = 0; b < page.getBlockCount(); b++) {
+                    page.getBlock(b).incRef();
+                }
+                return page.appendBlock(read);
+            }
+            Block[] newBlocks = new Block[page.getBlockCount() + 1];
+            newBlocks[page.getBlockCount()] = read;
+            try {
+                // TODO a way to filter with a range please.
+                int[] positions = IntStream.range(start, positionOffset).toArray();
+                for (int b = 0; b < page.getBlockCount(); b++) {
+                    newBlocks[b] = page.getBlock(b).filter(positions);
+                }
+                Page result = new Page(newBlocks);
+                Arrays.fill(newBlocks, null);
+                return result;
+            } finally {
+                Releasables.closeExpectNoException(newBlocks);
+            }
+        }
+
+        @Override
+        public void close() {
+            Releasables.closeExpectNoException(page::releaseBlocks, next);
+        }
+    }
+}

+ 138 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashLookupOperator.java

@@ -0,0 +1,138 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.ReleasableIterator;
+import org.elasticsearch.core.Releasables;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class HashLookupOperator extends AbstractPageMappingToIteratorOperator {
+    /**
+     * Factory for {@link HashLookupOperator}. It's received {@link Block}s
+     * are never closed, so we need to build them from a non-tracking factory.
+     */
+    public static class Factory implements Operator.OperatorFactory {
+        private final Block[] keys;
+        private final int[] blockMapping;
+
+        public Factory(Block[] keys, int[] blockMapping) {
+            this.keys = keys;
+            this.blockMapping = blockMapping;
+        }
+
+        @Override
+        public Operator get(DriverContext driverContext) {
+            return new HashLookupOperator(driverContext.blockFactory(), keys, blockMapping);
+        }
+
+        @Override
+        public String describe() {
+            StringBuilder b = new StringBuilder();
+            b.append("HashLookup[keys=[");
+            for (int k = 0; k < keys.length; k++) {
+                Block key = keys[k];
+                if (k != 0) {
+                    b.append(", ");
+                }
+                b.append("{type=").append(key.elementType());
+                b.append(", positions=").append(key.getPositionCount());
+                b.append(", size=").append(ByteSizeValue.ofBytes(key.ramBytesUsed())).append("}");
+            }
+            b.append("], mapping=").append(Arrays.toString(blockMapping)).append("]");
+            return b.toString();
+        }
+    }
+
+    private final BlockHash hash;
+    private final int[] blockMapping;
+
+    public HashLookupOperator(BlockFactory blockFactory, Block[] keys, int[] blockMapping) {
+        this.blockMapping = blockMapping;
+        List<BlockHash.GroupSpec> groups = new ArrayList<>(keys.length);
+        for (int k = 0; k < keys.length; k++) {
+            groups.add(new BlockHash.GroupSpec(k, keys[k].elementType()));
+        }
+        /*
+         * Force PackedValuesBlockHash because it assigned ordinals in order
+         * of arrival. We'll figure out how to adapt other block hashes to
+         * do that soon. Soon we must figure out how to map ordinals to rows.
+         * And, probably at the same time, handle multiple rows containing
+         * the same keys.
+         */
+        this.hash = BlockHash.buildPackedValuesBlockHash(
+            groups,
+            blockFactory,
+            (int) BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE.getBytes()
+        );
+        boolean success = false;
+        try {
+            final int[] lastOrd = new int[] { -1 };
+            hash.add(new Page(keys), new GroupingAggregatorFunction.AddInput() {
+                @Override
+                public void add(int positionOffset, IntBlock groupIds) {
+                    // TODO support multiple rows with the same keys
+                    for (int p = 0; p < groupIds.getPositionCount(); p++) {
+                        int first = groupIds.getFirstValueIndex(p);
+                        int end = groupIds.getValueCount(p) + first;
+                        for (int i = first; i < end; i++) {
+                            int ord = groupIds.getInt(i);
+                            if (ord != lastOrd[0] + 1) {
+                                throw new IllegalArgumentException("found a duplicate row");
+                            }
+                            lastOrd[0] = ord;
+                        }
+                    }
+                }
+
+                @Override
+                public void add(int positionOffset, IntVector groupIds) {
+                    for (int p = 0; p < groupIds.getPositionCount(); p++) {
+                        int ord = groupIds.getInt(p);
+                        if (ord != lastOrd[0] + 1) {
+                            throw new IllegalArgumentException("found a duplicate row");
+                        }
+                        lastOrd[0] = ord;
+                    }
+                }
+            });
+            success = true;
+        } finally {
+            if (success == false) {
+                close();
+            }
+        }
+    }
+
+    @Override
+    protected ReleasableIterator<Page> receive(Page page) {
+        Page mapped = page.projectBlocks(blockMapping);
+        page.releaseBlocks();
+        return appendBlocks(mapped, hash.lookup(mapped, BlockFactory.DEFAULT_MAX_BLOCK_PRIMITIVE_ARRAY_SIZE));
+    }
+
+    @Override
+    public String toString() {
+        return "HashLookup[hash=" + hash + ", mapping=" + Arrays.toString(blockMapping) + "]";
+    }
+
+    @Override
+    public void close() {
+        Releasables.close(super::close, hash);
+    }
+}

+ 1 - 25
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/ProjectOperator.java

@@ -7,9 +7,7 @@
 
 package org.elasticsearch.compute.operator;
 
-import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.core.Releasables;
 
 import java.util.Arrays;
 import java.util.List;
@@ -31,7 +29,6 @@ public class ProjectOperator extends AbstractPageMappingOperator {
     }
 
     private final int[] projection;
-    private final Block[] blocks;
 
     /**
      * Creates an operator that applies the given projection, encoded as an integer list where
@@ -42,7 +39,6 @@ public class ProjectOperator extends AbstractPageMappingOperator {
      */
     public ProjectOperator(List<Integer> projection) {
         this.projection = projection.stream().mapToInt(Integer::intValue).toArray();
-        this.blocks = new Block[projection.size()];
     }
 
     @Override
@@ -51,29 +47,9 @@ public class ProjectOperator extends AbstractPageMappingOperator {
         if (blockCount == 0) {
             return page;
         }
-        Page output = null;
         try {
-            int b = 0;
-            for (int source : projection) {
-                if (source >= blockCount) {
-                    throw new IllegalArgumentException(
-                        "Cannot project block with index [" + source + "] from a page with size [" + blockCount + "]"
-                    );
-                }
-                var block = page.getBlock(source);
-                blocks[b++] = block;
-                block.incRef();
-            }
-            int positionCount = page.getPositionCount();
-            // Use positionCount explicitly to avoid re-computing - also, if the projection is empty, there may be
-            // no more blocks left to determine the positionCount from.
-            output = new Page(positionCount, blocks);
-            return output;
+            return page.projectBlocks(projection);
         } finally {
-            if (output == null) {
-                Releasables.close(blocks);
-            }
-            Arrays.fill(blocks, null);
             page.releaseBlocks();
         }
     }

+ 70 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

@@ -37,6 +37,7 @@ import org.elasticsearch.compute.aggregation.CountAggregatorFunction;
 import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.BlockTestUtils;
 import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.compute.data.DocBlock;
 import org.elasticsearch.compute.data.DocVector;
@@ -54,6 +55,7 @@ import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverContext;
 import org.elasticsearch.compute.operator.HashAggregationOperator;
+import org.elasticsearch.compute.operator.HashLookupOperator;
 import org.elasticsearch.compute.operator.LimitOperator;
 import org.elasticsearch.compute.operator.Operator;
 import org.elasticsearch.compute.operator.OperatorTestCase;
@@ -71,12 +73,14 @@ import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
 import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL;
@@ -324,6 +328,72 @@ public class OperatorTests extends MapperServiceTestCase {
         return docIds;
     }
 
+    public void testHashLookup() {
+        // TODO move this to an integration test once we've plugged in the lookup
+        DriverContext driverContext = driverContext();
+        Map<Long, Integer> primeOrds = new TreeMap<>();
+        Block primesBlock;
+        try (LongBlock.Builder primes = driverContext.blockFactory().newLongBlockBuilder(30)) {
+            boolean[] sieve = new boolean[100];
+            Arrays.fill(sieve, true);
+            sieve[0] = false;
+            sieve[1] = false;
+            int prime = 2;
+            while (prime < 100) {
+                if (false == sieve[prime]) {
+                    prime++;
+                    continue;
+                }
+                primes.appendLong(prime);
+                primeOrds.put((long) prime, primeOrds.size());
+                for (int m = prime + prime; m < sieve.length; m += prime) {
+                    sieve[m] = false;
+                }
+                prime++;
+            }
+            primesBlock = primes.build();
+        }
+        try {
+            List<Long> values = new ArrayList<>();
+            List<Object> expectedValues = new ArrayList<>();
+            List<Object> expectedPrimeOrds = new ArrayList<>();
+            for (int i = 0; i < 100; i++) {
+                long v = i % 10 == 0 ? randomFrom(primeOrds.keySet()) : randomLongBetween(0, 100);
+                values.add(v);
+                expectedValues.add(v);
+                expectedPrimeOrds.add(primeOrds.get(v));
+            }
+
+            var actualValues = new ArrayList<>();
+            var actualPrimeOrds = new ArrayList<>();
+            try (
+                var driver = new Driver(
+                    driverContext,
+                    new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
+                    List.of(new HashLookupOperator(driverContext.blockFactory(), new Block[] { primesBlock }, new int[] { 0 })),
+                    new PageConsumerOperator(page -> {
+                        try {
+                            BlockTestUtils.readInto(actualValues, page.getBlock(0));
+                            BlockTestUtils.readInto(actualPrimeOrds, page.getBlock(1));
+                        } finally {
+                            page.releaseBlocks();
+                        }
+                    }),
+                    () -> {}
+                )
+            ) {
+                OperatorTestCase.runDriver(driver);
+            }
+
+            assertThat(actualValues, equalTo(expectedValues));
+            assertThat(actualPrimeOrds, equalTo(expectedPrimeOrds));
+            assertDriverContext(driverContext);
+        } finally {
+            primesBlock.close();
+        }
+
+    }
+
     /**
      * Creates a {@link BigArrays} that tracks releases but doesn't throw circuit breaking exceptions.
      */

+ 60 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperatorStatusTests.java

@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class AbstractPageMappingToIteratorOperatorStatusTests extends AbstractWireSerializingTestCase<
+    AbstractPageMappingToIteratorOperator.Status> {
+    public static AbstractPageMappingToIteratorOperator.Status simple() {
+        return new AbstractPageMappingToIteratorOperator.Status(200012, 123, 204);
+    }
+
+    public static String simpleToJson() {
+        return """
+            {
+              "process_nanos" : 200012,
+              "process_time" : "200micros",
+              "pages_received" : 123,
+              "pages_emitted" : 204
+            }""";
+    }
+
+    public void testToXContent() {
+        assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
+    }
+
+    @Override
+    protected Writeable.Reader<AbstractPageMappingToIteratorOperator.Status> instanceReader() {
+        return AbstractPageMappingToIteratorOperator.Status::new;
+    }
+
+    @Override
+    public AbstractPageMappingToIteratorOperator.Status createTestInstance() {
+        return new AbstractPageMappingToIteratorOperator.Status(randomNonNegativeLong(), randomNonNegativeInt(), randomNonNegativeInt());
+    }
+
+    @Override
+    protected AbstractPageMappingToIteratorOperator.Status mutateInstance(AbstractPageMappingToIteratorOperator.Status instance) {
+        long processNanos = instance.processNanos();
+        int pagesReceived = instance.pagesReceived();
+        int pagesEmitted = instance.pagesEmitted();
+        switch (between(0, 2)) {
+            case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
+            case 1 -> pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt);
+            case 2 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
+            default -> throw new UnsupportedOperationException();
+        }
+        return new AbstractPageMappingToIteratorOperator.Status(processNanos, pagesReceived, pagesEmitted);
+    }
+}

+ 48 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashLookupOperatorTests.java

@@ -0,0 +1,48 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.data.TestBlockFactory;
+
+import java.util.List;
+import java.util.stream.LongStream;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class HashLookupOperatorTests extends OperatorTestCase {
+    @Override
+    protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
+        return new SequenceLongBlockSourceOperator(blockFactory, LongStream.range(0, size).map(l -> randomFrom(1, 7, 14, 20)));
+    }
+
+    @Override
+    protected void assertSimpleOutput(List<Page> input, List<Page> results) {
+        assertThat(results.stream().mapToInt(Page::getPositionCount).sum(), equalTo(input.stream().mapToInt(Page::getPositionCount).sum()));
+    }
+
+    @Override
+    protected Operator.OperatorFactory simple() {
+        return new HashLookupOperator.Factory(
+            new Block[] { TestBlockFactory.getNonBreakingInstance().newLongArrayVector(new long[] { 7, 14, 20 }, 3).asBlock() },
+            new int[] { 0 }
+        );
+    }
+
+    @Override
+    protected String expectedDescriptionOfSimple() {
+        return "HashLookup[keys=[{type=LONG, positions=3, size=96b}], mapping=[0]]";
+    }
+
+    @Override
+    protected String expectedToStringOfSimple() {
+        return "HashLookup[hash=PackedValuesBlockHash{groups=[0:LONG], entries=3, size=536b}, mapping=[0]]";
+    }
+}

+ 116 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/IteratorAppendPageTests.java

@@ -0,0 +1,116 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.data.TestBlockFactory;
+import org.elasticsearch.core.ReleasableIterator;
+
+import java.util.List;
+import java.util.stream.LongStream;
+
+import static org.hamcrest.Matchers.equalTo;
+
+/**
+ * Tests {@link AbstractPageMappingToIteratorOperator} against a test
+ * subclass that appends {@code 1} and chunks the incoming {@link Page}
+ * at {@code 100} positions.
+ */
+public class IteratorAppendPageTests extends OperatorTestCase {
+    private static final int ADDED_VALUE = 1;
+    private static final int CHUNK = 100;
+
+    private static class IteratorAppendPage extends AbstractPageMappingToIteratorOperator {
+        private static class Factory implements Operator.OperatorFactory {
+            @Override
+            public Operator get(DriverContext driverContext) {
+                return new IteratorAppendPage(driverContext.blockFactory());
+            }
+
+            @Override
+            public String describe() {
+                return "IteratorAppendPage[]";
+            }
+        }
+
+        private final BlockFactory blockFactory;
+
+        private IteratorAppendPage(BlockFactory blockFactory) {
+            this.blockFactory = blockFactory;
+        }
+
+        @Override
+        protected ReleasableIterator<Page> receive(Page page) {
+            return appendBlocks(page, new ReleasableIterator<>() {
+                private int positionOffset;
+
+                @Override
+                public boolean hasNext() {
+                    return positionOffset < page.getPositionCount();
+                }
+
+                @Override
+                public Block next() {
+                    if (hasNext() == false) {
+                        throw new IllegalStateException();
+                    }
+                    int positions = Math.min(page.getPositionCount() - positionOffset, CHUNK);
+                    positionOffset += positions;
+                    return blockFactory.newConstantIntBlockWith(ADDED_VALUE, positions);
+                }
+
+                @Override
+                public void close() {
+                    // Nothing to do, appendBlocks iterator closes the page for us.
+                }
+            });
+        }
+
+        @Override
+        public String toString() {
+            return "IteratorAppendPage[]";
+        }
+    }
+
+    @Override
+    protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
+        return new SequenceLongBlockSourceOperator(blockFactory, LongStream.range(0, size).map(l -> randomLong()));
+    }
+
+    @Override
+    protected void assertSimpleOutput(List<Page> input, List<Page> results) {
+        int r = 0;
+        for (Page in : input) {
+            for (int offset = 0; offset < in.getPositionCount(); offset += CHUNK) {
+                Page resultPage = results.get(r++);
+                assertThat(resultPage.getPositionCount(), equalTo(Math.min(CHUNK, in.getPositionCount() - offset)));
+                assertThat(
+                    resultPage.getBlock(1),
+                    equalTo(TestBlockFactory.getNonBreakingInstance().newConstantIntBlockWith(ADDED_VALUE, resultPage.getPositionCount()))
+                );
+            }
+        }
+    }
+
+    @Override
+    protected Operator.OperatorFactory simple() {
+        return new IteratorAppendPage.Factory();
+    }
+
+    @Override
+    protected String expectedDescriptionOfSimple() {
+        return "IteratorAppendPage[]";
+    }
+
+    @Override
+    protected String expectedToStringOfSimple() {
+        return expectedDescriptionOfSimple();
+    }
+}

+ 118 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/IteratorRemovePageTests.java

@@ -0,0 +1,118 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.ReleasableIterator;
+
+import java.util.List;
+import java.util.stream.LongStream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+
+/**
+ * Tests {@link AbstractPageMappingToIteratorOperator} against a test
+ * subclass that removes every other page.
+ */
+public class IteratorRemovePageTests extends OperatorTestCase {
+    private static class IteratorRemovePage extends AbstractPageMappingToIteratorOperator {
+        private static class Factory implements OperatorFactory {
+            @Override
+            public Operator get(DriverContext driverContext) {
+                return new IteratorRemovePage();
+            }
+
+            @Override
+            public String describe() {
+                return "IteratorRemovePage[]";
+            }
+        }
+
+        private boolean keep = true;
+
+        @Override
+        protected ReleasableIterator<Page> receive(Page page) {
+            if (keep) {
+                keep = false;
+                return new ReleasableIterator<>() {
+                    Page p = page;
+
+                    @Override
+                    public boolean hasNext() {
+                        return p != null;
+                    }
+
+                    @Override
+                    public Page next() {
+                        Page ret = p;
+                        p = null;
+                        return ret;
+                    }
+
+                    @Override
+                    public void close() {
+                        if (p != null) {
+                            p.releaseBlocks();
+                        }
+                    }
+                };
+            }
+            keep = true;
+            page.releaseBlocks();
+            return new ReleasableIterator<>() {
+                @Override
+                public boolean hasNext() {
+                    return false;
+                }
+
+                @Override
+                public Page next() {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public void close() {}
+            };
+        }
+
+        @Override
+        public String toString() {
+            return "IteratorRemovePage[]";
+        }
+    }
+
+    @Override
+    protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
+        return new SequenceLongBlockSourceOperator(blockFactory, LongStream.range(0, size).map(l -> randomLong()));
+    }
+
+    @Override
+    protected void assertSimpleOutput(List<Page> input, List<Page> results) {
+        assertThat(results, hasSize((input.size() + 1) / 2));
+        for (int i = 0; i < input.size(); i += 2) {
+            assertThat(input.get(i), equalTo(results.get(i / 2)));
+        }
+    }
+
+    @Override
+    protected Operator.OperatorFactory simple() {
+        return new IteratorRemovePage.Factory();
+    }
+
+    @Override
+    protected String expectedDescriptionOfSimple() {
+        return "IteratorRemovePage[]";
+    }
+
+    @Override
+    protected String expectedToStringOfSimple() {
+        return expectedDescriptionOfSimple();
+    }
+}

+ 1 - 7
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java

@@ -201,19 +201,13 @@ public abstract class OperatorTestCase extends AnyOperatorTestCase {
 
         // Clone the input so that the operator can close it, then, later, we can read it again to build the assertion.
         List<Page> origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance());
-        BigArrays bigArrays = context.bigArrays().withCircuitBreaking();
 
         List<Page> results = drive(simple().get(context), input.iterator(), context);
         assertSimpleOutput(origInput, results);
-        assertThat(bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST).getUsed(), equalTo(0L));
+        assertThat(context.breaker().getUsed(), equalTo(0L));
 
-        List<Block> resultBlocks = new ArrayList<>();
         // Release all result blocks. After this, all input blocks should be released as well, otherwise we have a leak.
         for (Page p : results) {
-            for (int i = 0; i < p.getBlockCount(); i++) {
-                resultBlocks.add(p.getBlock(i));
-            }
-
             p.releaseBlocks();
         }
 

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.lucene.LuceneOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
+import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator;
 import org.elasticsearch.compute.operator.AggregationOperator;
 import org.elasticsearch.compute.operator.AsyncOperator;
 import org.elasticsearch.compute.operator.DriverStatus;
@@ -175,6 +176,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
             List.of(
                 DriverStatus.ENTRY,
                 AbstractPageMappingOperator.Status.ENTRY,
+                AbstractPageMappingToIteratorOperator.Status.ENTRY,
                 AggregationOperator.Status.ENTRY,
                 ExchangeSinkOperator.Status.ENTRY,
                 ExchangeSourceOperator.Status.ENTRY,