瀏覽代碼

Leverage ordinals in enrich lookup (#107449)

This change leverages ordinals in enrich lookup. Instead of looking up 
and extracting enrich fields for all input terms, this improvement only
looks up and extracts the dictionary, then applies the ordinals to the
enrich results.

```
| 50th percentile | esql_stats_enrich_rates_fares | 242.949 | 34.7007 | -208.248 | ms |  -85.72% |
| 90th percentile | esql_stats_enrich_rates_fares | 245.479 | 36.3419 | -209.137 | ms |  -85.20% |
|100th percentile | esql_stats_enrich_rates_fares | 252.877 | 49.0826 | -203.795 | ms |  -80.59% |
```
Nhat Nguyen 1 年之前
父節點
當前提交
c2a3ec4263
共有 14 個文件被更改,包括 619 次插入184 次删除
  1. 5 0
      docs/changelog/107449.yaml
  2. 8 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java
  3. 33 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java
  4. 75 21
      x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForBoolean.java
  5. 77 23
      x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForBytesRef.java
  6. 75 21
      x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForDouble.java
  7. 74 21
      x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForInt.java
  8. 75 21
      x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForLong.java
  9. 13 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
  10. 14 15
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilder.java
  11. 8 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperator.java
  12. 84 24
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/X-EnrichResultBuilder.java.st
  13. 73 27
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderTests.java
  14. 5 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperatorTests.java

+ 5 - 0
docs/changelog/107449.yaml

@@ -0,0 +1,5 @@
+pr: 107449
+summary: Leverage ordinals in enrich lookup
+area: ES|QL
+type: enhancement
+issues: []

+ 8 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java

@@ -55,6 +55,14 @@ public final class OrdinalBytesRefBlock extends AbstractNonThreadSafeRefCounted
         return ordinals.getTotalValueCount() * 2 / 3 >= bytes.getPositionCount();
     }
 
+    public IntBlock getOrdinalsBlock() {
+        return ordinals;
+    }
+
+    public BytesRefVector getDictionaryVector() {
+        return bytes;
+    }
+
     @Override
     public BytesRef getBytesRef(int valueIndex, BytesRef dest) {
         return bytes.getBytesRef(ordinals.getInt(valueIndex), dest);

+ 33 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java

@@ -385,6 +385,39 @@ public class EnrichIT extends AbstractEsqlIntegTestCase {
         }
     }
 
+    /**
+     * To enable enrich lookup using ordinals
+     */
+    public void testManyDocuments() {
+        int numDocs = between(200, 2000);
+        var artists = Map.of("s1", "Eagles", "s2", "Linkin Park", "s3", "Linkin Park", "s4", "Disturbed");
+        client().admin()
+            .indices()
+            .prepareCreate("many_docs")
+            .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1))
+            .setMapping("song_id", "type=keyword")
+            .get();
+        Map<String, Long> songs = new HashMap<>();
+        for (int i = 0; i < numDocs; i++) {
+            String song = randomFrom(artists.keySet());
+            client().prepareIndex("many_docs").setSource("song_id", song).get();
+            songs.merge(song, 1L, Long::sum);
+        }
+        client().admin().indices().prepareRefresh("many_docs").get();
+        try (EsqlQueryResponse resp = run("FROM many_docs | ENRICH songs | STATS count(*) BY artist")) {
+            List<List<Object>> values = EsqlTestUtils.getValuesList(resp);
+            Map<String, Long> actual = new HashMap<>();
+            for (List<Object> value : values) {
+                actual.merge((String) value.get(1), (Long) value.get(0), Long::sum);
+            }
+            Map<String, Long> expected = new HashMap<>();
+            for (Map.Entry<String, Long> e : songs.entrySet()) {
+                expected.merge(artists.get(e.getKey()), e.getValue(), Long::sum);
+            }
+            assertThat(actual, equalTo(expected));
+        }
+    }
+
     public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
 
         public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {

+ 75 - 21
x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForBoolean.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasables;
@@ -25,9 +26,9 @@ import java.util.Arrays;
 final class EnrichResultBuilderForBoolean extends EnrichResultBuilder {
     private ObjectArray<boolean[]> cells;
 
-    EnrichResultBuilderForBoolean(BlockFactory blockFactory, int channel, int totalPositions) {
-        super(blockFactory, channel, totalPositions);
-        this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
+    EnrichResultBuilderForBoolean(BlockFactory blockFactory, int channel) {
+        super(blockFactory, channel);
+        this.cells = blockFactory.bigArrays().newObjectArray(1);
     }
 
     @Override
@@ -39,6 +40,7 @@ final class EnrichResultBuilderForBoolean extends EnrichResultBuilder {
                 continue;
             }
             int cellPosition = positions.getInt(i);
+            cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
             final var oldCell = cells.get(cellPosition);
             final var newCell = extendCell(oldCell, valueCount);
             cells.set(cellPosition, newCell);
@@ -59,30 +61,82 @@ final class EnrichResultBuilderForBoolean extends EnrichResultBuilder {
         }
     }
 
-    @Override
-    Block build() {
-        try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(totalPositions)) {
-            for (int i = 0; i < totalPositions; i++) {
-                final var cell = cells.get(i);
-                if (cell == null) {
-                    builder.appendNull();
-                    continue;
-                }
-                if (cell.length > 1) {
-                    builder.beginPositionEntry();
-                }
-                // TODO: sort and dedup
-                for (var v : cell) {
-                    builder.appendBoolean(v);
-                }
-                if (cell.length > 1) {
-                    builder.endPositionEntry();
+    private boolean[] combineCell(boolean[] first, boolean[] second) {
+        if (first == null) {
+            return second;
+        }
+        if (second == null) {
+            return first;
+        }
+        var result = new boolean[first.length + second.length];
+        System.arraycopy(first, 0, result, 0, first.length);
+        System.arraycopy(second, 0, result, first.length, second.length);
+        return result;
+    }
+
+    private void appendGroupToBlockBuilder(BooleanBlock.Builder builder, boolean[] group) {
+        if (group == null) {
+            builder.appendNull();
+        } else if (group.length == 1) {
+            builder.appendBoolean(group[0]);
+        } else {
+            builder.beginPositionEntry();
+            // TODO: sort and dedup and set MvOrdering
+            for (var v : group) {
+                builder.appendBoolean(v);
+            }
+            builder.endPositionEntry();
+        }
+    }
+
+    private boolean[] getCellOrNull(int position) {
+        return position < cells.size() ? cells.get(position) : null;
+    }
+
+    private Block buildWithSelected(IntBlock selected) {
+        try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                int selectedCount = selected.getValueCount(i);
+                switch (selectedCount) {
+                    case 0 -> builder.appendNull();
+                    case 1 -> {
+                        int groupId = selected.getInt(selected.getFirstValueIndex(i));
+                        appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
+                    }
+                    default -> {
+                        int firstValueIndex = selected.getFirstValueIndex(i);
+                        var cell = getCellOrNull(selected.getInt(firstValueIndex));
+                        for (int p = 1; p < selectedCount; p++) {
+                            int groupId = selected.getInt(firstValueIndex + p);
+                            cell = combineCell(cell, getCellOrNull(groupId));
+                        }
+                        appendGroupToBlockBuilder(builder, cell);
+                    }
                 }
             }
             return builder.build();
         }
     }
 
+    private Block buildWithSelected(IntVector selected) {
+        try (BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
+            }
+            return builder.build();
+        }
+    }
+
+    @Override
+    Block build(IntBlock selected) {
+        var vector = selected.asVector();
+        if (vector != null) {
+            return buildWithSelected(vector);
+        } else {
+            return buildWithSelected(selected);
+        }
+    }
+
     @Override
     public void close() {
         Releasables.close(cells, super::close);

+ 77 - 23
x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForBytesRef.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasables;
@@ -26,14 +27,15 @@ import java.util.Arrays;
  */
 final class EnrichResultBuilderForBytesRef extends EnrichResultBuilder {
     private final BytesRefArray bytes; // shared between all cells
+    private BytesRef scratch = new BytesRef();
     private ObjectArray<int[]> cells;
 
-    EnrichResultBuilderForBytesRef(BlockFactory blockFactory, int channel, int totalPositions) {
-        super(blockFactory, channel, totalPositions);
-        this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
+    EnrichResultBuilderForBytesRef(BlockFactory blockFactory, int channel) {
+        super(blockFactory, channel);
+        this.cells = blockFactory.bigArrays().newObjectArray(1);
         BytesRefArray bytes = null;
         try {
-            bytes = new BytesRefArray(totalPositions * 3L, blockFactory.bigArrays());
+            bytes = new BytesRefArray(1L, blockFactory.bigArrays());
             this.bytes = bytes;
         } finally {
             if (bytes == null) {
@@ -52,6 +54,7 @@ final class EnrichResultBuilderForBytesRef extends EnrichResultBuilder {
                 continue;
             }
             int cellPosition = positions.getInt(i);
+            cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
             final var oldCell = cells.get(cellPosition);
             final var newCell = extendCell(oldCell, valueCount);
             cells.set(cellPosition, newCell);
@@ -75,31 +78,82 @@ final class EnrichResultBuilderForBytesRef extends EnrichResultBuilder {
         }
     }
 
-    @Override
-    Block build() {
-        try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(totalPositions)) {
-            BytesRef scratch = new BytesRef();
-            for (int i = 0; i < totalPositions; i++) {
-                final var cell = cells.get(i);
-                if (cell == null) {
-                    builder.appendNull();
-                    continue;
-                }
-                if (cell.length > 1) {
-                    builder.beginPositionEntry();
-                }
-                // TODO: sort and dedup
-                for (var v : cell) {
-                    builder.appendBytesRef(bytes.get(v, scratch));
-                }
-                if (cell.length > 1) {
-                    builder.endPositionEntry();
+    private int[] combineCell(int[] first, int[] second) {
+        if (first == null) {
+            return second;
+        }
+        if (second == null) {
+            return first;
+        }
+        var result = new int[first.length + second.length];
+        System.arraycopy(first, 0, result, 0, first.length);
+        System.arraycopy(second, 0, result, first.length, second.length);
+        return result;
+    }
+
+    private void appendGroupToBlockBuilder(BytesRefBlock.Builder builder, int[] group) {
+        if (group == null) {
+            builder.appendNull();
+        } else if (group.length == 1) {
+            builder.appendBytesRef(bytes.get(group[0], scratch));
+        } else {
+            builder.beginPositionEntry();
+            // TODO: sort and dedup and set MvOrdering
+            for (var v : group) {
+                builder.appendBytesRef(bytes.get(v, scratch));
+            }
+            builder.endPositionEntry();
+        }
+    }
+
+    private int[] getCellOrNull(int position) {
+        return position < cells.size() ? cells.get(position) : null;
+    }
+
+    private Block buildWithSelected(IntBlock selected) {
+        try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                int selectedCount = selected.getValueCount(i);
+                switch (selectedCount) {
+                    case 0 -> builder.appendNull();
+                    case 1 -> {
+                        int groupId = selected.getInt(selected.getFirstValueIndex(i));
+                        appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
+                    }
+                    default -> {
+                        int firstValueIndex = selected.getFirstValueIndex(i);
+                        var cell = getCellOrNull(selected.getInt(firstValueIndex));
+                        for (int p = 1; p < selectedCount; p++) {
+                            int groupId = selected.getInt(firstValueIndex + p);
+                            cell = combineCell(cell, getCellOrNull(groupId));
+                        }
+                        appendGroupToBlockBuilder(builder, cell);
+                    }
                 }
             }
             return builder.build();
         }
     }
 
+    private Block buildWithSelected(IntVector selected) {
+        try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
+            }
+            return builder.build();
+        }
+    }
+
+    @Override
+    Block build(IntBlock selected) {
+        var vector = selected.asVector();
+        if (vector != null) {
+            return buildWithSelected(vector);
+        } else {
+            return buildWithSelected(selected);
+        }
+    }
+
     @Override
     public void close() {
         Releasables.close(bytes, cells, super::close);

+ 75 - 21
x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForDouble.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasables;
@@ -25,9 +26,9 @@ import java.util.Arrays;
 final class EnrichResultBuilderForDouble extends EnrichResultBuilder {
     private ObjectArray<double[]> cells;
 
-    EnrichResultBuilderForDouble(BlockFactory blockFactory, int channel, int totalPositions) {
-        super(blockFactory, channel, totalPositions);
-        this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
+    EnrichResultBuilderForDouble(BlockFactory blockFactory, int channel) {
+        super(blockFactory, channel);
+        this.cells = blockFactory.bigArrays().newObjectArray(1);
     }
 
     @Override
@@ -39,6 +40,7 @@ final class EnrichResultBuilderForDouble extends EnrichResultBuilder {
                 continue;
             }
             int cellPosition = positions.getInt(i);
+            cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
             final var oldCell = cells.get(cellPosition);
             final var newCell = extendCell(oldCell, valueCount);
             cells.set(cellPosition, newCell);
@@ -59,30 +61,82 @@ final class EnrichResultBuilderForDouble extends EnrichResultBuilder {
         }
     }
 
-    @Override
-    Block build() {
-        try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(totalPositions)) {
-            for (int i = 0; i < totalPositions; i++) {
-                final var cell = cells.get(i);
-                if (cell == null) {
-                    builder.appendNull();
-                    continue;
-                }
-                if (cell.length > 1) {
-                    builder.beginPositionEntry();
-                }
-                // TODO: sort and dedup
-                for (var v : cell) {
-                    builder.appendDouble(v);
-                }
-                if (cell.length > 1) {
-                    builder.endPositionEntry();
+    private double[] combineCell(double[] first, double[] second) {
+        if (first == null) {
+            return second;
+        }
+        if (second == null) {
+            return first;
+        }
+        var result = new double[first.length + second.length];
+        System.arraycopy(first, 0, result, 0, first.length);
+        System.arraycopy(second, 0, result, first.length, second.length);
+        return result;
+    }
+
+    private void appendGroupToBlockBuilder(DoubleBlock.Builder builder, double[] group) {
+        if (group == null) {
+            builder.appendNull();
+        } else if (group.length == 1) {
+            builder.appendDouble(group[0]);
+        } else {
+            builder.beginPositionEntry();
+            // TODO: sort and dedup and set MvOrdering
+            for (var v : group) {
+                builder.appendDouble(v);
+            }
+            builder.endPositionEntry();
+        }
+    }
+
+    private double[] getCellOrNull(int position) {
+        return position < cells.size() ? cells.get(position) : null;
+    }
+
+    private Block buildWithSelected(IntBlock selected) {
+        try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                int selectedCount = selected.getValueCount(i);
+                switch (selectedCount) {
+                    case 0 -> builder.appendNull();
+                    case 1 -> {
+                        int groupId = selected.getInt(selected.getFirstValueIndex(i));
+                        appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
+                    }
+                    default -> {
+                        int firstValueIndex = selected.getFirstValueIndex(i);
+                        var cell = getCellOrNull(selected.getInt(firstValueIndex));
+                        for (int p = 1; p < selectedCount; p++) {
+                            int groupId = selected.getInt(firstValueIndex + p);
+                            cell = combineCell(cell, getCellOrNull(groupId));
+                        }
+                        appendGroupToBlockBuilder(builder, cell);
+                    }
                 }
             }
             return builder.build();
         }
     }
 
+    private Block buildWithSelected(IntVector selected) {
+        try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
+            }
+            return builder.build();
+        }
+    }
+
+    @Override
+    Block build(IntBlock selected) {
+        var vector = selected.asVector();
+        if (vector != null) {
+            return buildWithSelected(vector);
+        } else {
+            return buildWithSelected(selected);
+        }
+    }
+
     @Override
     public void close() {
         Releasables.close(cells, super::close);

+ 74 - 21
x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForInt.java

@@ -25,9 +25,9 @@ import java.util.Arrays;
 final class EnrichResultBuilderForInt extends EnrichResultBuilder {
     private ObjectArray<int[]> cells;
 
-    EnrichResultBuilderForInt(BlockFactory blockFactory, int channel, int totalPositions) {
-        super(blockFactory, channel, totalPositions);
-        this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
+    EnrichResultBuilderForInt(BlockFactory blockFactory, int channel) {
+        super(blockFactory, channel);
+        this.cells = blockFactory.bigArrays().newObjectArray(1);
     }
 
     @Override
@@ -39,6 +39,7 @@ final class EnrichResultBuilderForInt extends EnrichResultBuilder {
                 continue;
             }
             int cellPosition = positions.getInt(i);
+            cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
             final var oldCell = cells.get(cellPosition);
             final var newCell = extendCell(oldCell, valueCount);
             cells.set(cellPosition, newCell);
@@ -59,30 +60,82 @@ final class EnrichResultBuilderForInt extends EnrichResultBuilder {
         }
     }
 
-    @Override
-    Block build() {
-        try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(totalPositions)) {
-            for (int i = 0; i < totalPositions; i++) {
-                final var cell = cells.get(i);
-                if (cell == null) {
-                    builder.appendNull();
-                    continue;
-                }
-                if (cell.length > 1) {
-                    builder.beginPositionEntry();
-                }
-                // TODO: sort and dedup
-                for (var v : cell) {
-                    builder.appendInt(v);
-                }
-                if (cell.length > 1) {
-                    builder.endPositionEntry();
+    private int[] combineCell(int[] first, int[] second) {
+        if (first == null) {
+            return second;
+        }
+        if (second == null) {
+            return first;
+        }
+        var result = new int[first.length + second.length];
+        System.arraycopy(first, 0, result, 0, first.length);
+        System.arraycopy(second, 0, result, first.length, second.length);
+        return result;
+    }
+
+    private void appendGroupToBlockBuilder(IntBlock.Builder builder, int[] group) {
+        if (group == null) {
+            builder.appendNull();
+        } else if (group.length == 1) {
+            builder.appendInt(group[0]);
+        } else {
+            builder.beginPositionEntry();
+            // TODO: sort and dedup and set MvOrdering
+            for (var v : group) {
+                builder.appendInt(v);
+            }
+            builder.endPositionEntry();
+        }
+    }
+
+    private int[] getCellOrNull(int position) {
+        return position < cells.size() ? cells.get(position) : null;
+    }
+
+    private Block buildWithSelected(IntBlock selected) {
+        try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                int selectedCount = selected.getValueCount(i);
+                switch (selectedCount) {
+                    case 0 -> builder.appendNull();
+                    case 1 -> {
+                        int groupId = selected.getInt(selected.getFirstValueIndex(i));
+                        appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
+                    }
+                    default -> {
+                        int firstValueIndex = selected.getFirstValueIndex(i);
+                        var cell = getCellOrNull(selected.getInt(firstValueIndex));
+                        for (int p = 1; p < selectedCount; p++) {
+                            int groupId = selected.getInt(firstValueIndex + p);
+                            cell = combineCell(cell, getCellOrNull(groupId));
+                        }
+                        appendGroupToBlockBuilder(builder, cell);
+                    }
                 }
             }
             return builder.build();
         }
     }
 
+    private Block buildWithSelected(IntVector selected) {
+        try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
+            }
+            return builder.build();
+        }
+    }
+
+    @Override
+    Block build(IntBlock selected) {
+        var vector = selected.asVector();
+        if (vector != null) {
+            return buildWithSelected(vector);
+        } else {
+            return buildWithSelected(selected);
+        }
+    }
+
     @Override
     public void close() {
         Releasables.close(cells, super::close);

+ 75 - 21
x-pack/plugin/esql/src/main/generated-src/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderForLong.java

@@ -11,6 +11,7 @@ import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
@@ -25,9 +26,9 @@ import java.util.Arrays;
 final class EnrichResultBuilderForLong extends EnrichResultBuilder {
     private ObjectArray<long[]> cells;
 
-    EnrichResultBuilderForLong(BlockFactory blockFactory, int channel, int totalPositions) {
-        super(blockFactory, channel, totalPositions);
-        this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
+    EnrichResultBuilderForLong(BlockFactory blockFactory, int channel) {
+        super(blockFactory, channel);
+        this.cells = blockFactory.bigArrays().newObjectArray(1);
     }
 
     @Override
@@ -39,6 +40,7 @@ final class EnrichResultBuilderForLong extends EnrichResultBuilder {
                 continue;
             }
             int cellPosition = positions.getInt(i);
+            cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
             final var oldCell = cells.get(cellPosition);
             final var newCell = extendCell(oldCell, valueCount);
             cells.set(cellPosition, newCell);
@@ -59,30 +61,82 @@ final class EnrichResultBuilderForLong extends EnrichResultBuilder {
         }
     }
 
-    @Override
-    Block build() {
-        try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(totalPositions)) {
-            for (int i = 0; i < totalPositions; i++) {
-                final var cell = cells.get(i);
-                if (cell == null) {
-                    builder.appendNull();
-                    continue;
-                }
-                if (cell.length > 1) {
-                    builder.beginPositionEntry();
-                }
-                // TODO: sort and dedup
-                for (var v : cell) {
-                    builder.appendLong(v);
-                }
-                if (cell.length > 1) {
-                    builder.endPositionEntry();
+    private long[] combineCell(long[] first, long[] second) {
+        if (first == null) {
+            return second;
+        }
+        if (second == null) {
+            return first;
+        }
+        var result = new long[first.length + second.length];
+        System.arraycopy(first, 0, result, 0, first.length);
+        System.arraycopy(second, 0, result, first.length, second.length);
+        return result;
+    }
+
+    private void appendGroupToBlockBuilder(LongBlock.Builder builder, long[] group) {
+        if (group == null) {
+            builder.appendNull();
+        } else if (group.length == 1) {
+            builder.appendLong(group[0]);
+        } else {
+            builder.beginPositionEntry();
+            // TODO: sort and dedup and set MvOrdering
+            for (var v : group) {
+                builder.appendLong(v);
+            }
+            builder.endPositionEntry();
+        }
+    }
+
+    private long[] getCellOrNull(int position) {
+        return position < cells.size() ? cells.get(position) : null;
+    }
+
+    private Block buildWithSelected(IntBlock selected) {
+        try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                int selectedCount = selected.getValueCount(i);
+                switch (selectedCount) {
+                    case 0 -> builder.appendNull();
+                    case 1 -> {
+                        int groupId = selected.getInt(selected.getFirstValueIndex(i));
+                        appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
+                    }
+                    default -> {
+                        int firstValueIndex = selected.getFirstValueIndex(i);
+                        var cell = getCellOrNull(selected.getInt(firstValueIndex));
+                        for (int p = 1; p < selectedCount; p++) {
+                            int groupId = selected.getInt(firstValueIndex + p);
+                            cell = combineCell(cell, getCellOrNull(groupId));
+                        }
+                        appendGroupToBlockBuilder(builder, cell);
+                    }
                 }
             }
             return builder.build();
         }
     }
 
+    private Block buildWithSelected(IntVector selected) {
+        try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
+            }
+            return builder.build();
+        }
+    }
+
+    @Override
+    Block build(IntBlock selected) {
+        var vector = selected.asVector();
+        if (vector != null) {
+            return buildWithSelected(vector);
+        } else {
+            return buildWithSelected(selected);
+        }
+    }
+
     @Override
     public void close() {
         Releasables.close(cells, super::close);

+ 13 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

@@ -30,7 +30,10 @@ import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockStreamInput;
 import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LocalCircuitBreaker;
+import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.Driver;
@@ -246,6 +249,14 @@ public class EnrichLookupService {
         ActionListener<Page> listener
     ) {
         Block inputBlock = inputPage.getBlock(0);
+        final IntBlock selectedPositions;
+        if (inputBlock instanceof OrdinalBytesRefBlock ordinalBytesRefBlock) {
+            inputBlock = ordinalBytesRefBlock.getDictionaryVector().asBlock();
+            selectedPositions = ordinalBytesRefBlock.getOrdinalsBlock();
+            selectedPositions.mustIncRef();
+        } else {
+            selectedPositions = IntVector.range(0, inputBlock.getPositionCount(), blockFactory).asBlock();
+        }
         LocalCircuitBreaker localBreaker = null;
         try {
             if (inputBlock.areAllValuesNull()) {
@@ -321,7 +332,7 @@ public class EnrichLookupService {
             // merging field-values by position
             final int[] mergingChannels = IntStream.range(0, extractFields.size()).map(i -> i + 2).toArray();
             intermediateOperators.add(
-                new MergePositionsOperator(inputPage.getPositionCount(), 1, mergingChannels, mergingTypes, driverContext.blockFactory())
+                new MergePositionsOperator(1, mergingChannels, mergingTypes, selectedPositions, driverContext.blockFactory())
             );
             AtomicReference<Page> result = new AtomicReference<>();
             OutputOperator outputOperator = new OutputOperator(List.of(), Function.identity(), result::set);
@@ -362,7 +373,7 @@ public class EnrichLookupService {
         } catch (Exception e) {
             listener.onFailure(e);
         } finally {
-            Releasables.close(localBreaker);
+            Releasables.close(selectedPositions, localBreaker);
         }
     }
 

+ 14 - 15
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilder.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.esql.enrich;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.core.Releasable;
@@ -21,13 +22,11 @@ import org.elasticsearch.core.Releasable;
 abstract class EnrichResultBuilder implements Releasable {
     protected final BlockFactory blockFactory;
     protected final int channel;
-    protected final int totalPositions;
     private long usedBytes;
 
-    EnrichResultBuilder(BlockFactory blockFactory, int channel, int totalPositions) {
+    EnrichResultBuilder(BlockFactory blockFactory, int channel) {
         this.blockFactory = blockFactory;
         this.channel = channel;
-        this.totalPositions = totalPositions;
     }
 
     /**
@@ -38,7 +37,7 @@ abstract class EnrichResultBuilder implements Releasable {
      */
     abstract void addInputPage(IntVector positions, Page page);
 
-    abstract Block build();
+    abstract Block build(IntBlock selected);
 
     final void adjustBreaker(long bytes) {
         blockFactory.breaker().addEstimateBytesAndMaybeBreak(bytes, "<<enrich-result>>");
@@ -50,21 +49,21 @@ abstract class EnrichResultBuilder implements Releasable {
         blockFactory.breaker().addWithoutBreaking(-usedBytes);
     }
 
-    static EnrichResultBuilder enrichResultBuilder(ElementType elementType, BlockFactory blockFactory, int channel, int totalPositions) {
+    static EnrichResultBuilder enrichResultBuilder(ElementType elementType, BlockFactory blockFactory, int channel) {
         return switch (elementType) {
-            case NULL -> new EnrichResultBuilderForNull(blockFactory, channel, totalPositions);
-            case INT -> new EnrichResultBuilderForInt(blockFactory, channel, totalPositions);
-            case LONG -> new EnrichResultBuilderForLong(blockFactory, channel, totalPositions);
-            case DOUBLE -> new EnrichResultBuilderForDouble(blockFactory, channel, totalPositions);
-            case BOOLEAN -> new EnrichResultBuilderForBoolean(blockFactory, channel, totalPositions);
-            case BYTES_REF -> new EnrichResultBuilderForBytesRef(blockFactory, channel, totalPositions);
+            case NULL -> new EnrichResultBuilderForNull(blockFactory, channel);
+            case INT -> new EnrichResultBuilderForInt(blockFactory, channel);
+            case LONG -> new EnrichResultBuilderForLong(blockFactory, channel);
+            case DOUBLE -> new EnrichResultBuilderForDouble(blockFactory, channel);
+            case BOOLEAN -> new EnrichResultBuilderForBoolean(blockFactory, channel);
+            case BYTES_REF -> new EnrichResultBuilderForBytesRef(blockFactory, channel);
             default -> throw new IllegalArgumentException("no enrich result builder for [" + elementType + "]");
         };
     }
 
     private static class EnrichResultBuilderForNull extends EnrichResultBuilder {
-        EnrichResultBuilderForNull(BlockFactory blockFactory, int channel, int totalPositions) {
-            super(blockFactory, channel, totalPositions);
+        EnrichResultBuilderForNull(BlockFactory blockFactory, int channel) {
+            super(blockFactory, channel);
         }
 
         @Override
@@ -73,8 +72,8 @@ abstract class EnrichResultBuilder implements Releasable {
         }
 
         @Override
-        Block build() {
-            return blockFactory.newConstantNullBlock(totalPositions);
+        Block build(IntBlock selected) {
+            return blockFactory.newConstantNullBlock(selected.getPositionCount());
         }
     }
 }

+ 8 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperator.java

@@ -46,16 +46,16 @@ import java.util.Objects;
 final class MergePositionsOperator implements Operator {
     private boolean finished = false;
     private final int positionChannel;
-
     private final EnrichResultBuilder[] builders;
+    private final IntBlock selectedPositions;
 
     private Page outputPage;
 
     MergePositionsOperator(
-        int positionCount,
         int positionChannel,
         int[] mergingChannels,
         ElementType[] mergingTypes,
+        IntBlock selectedPositions,
         BlockFactory blockFactory
     ) {
         if (mergingChannels.length != mergingTypes.length) {
@@ -70,13 +70,15 @@ final class MergePositionsOperator implements Operator {
         this.builders = new EnrichResultBuilder[mergingTypes.length];
         try {
             for (int i = 0; i < mergingTypes.length; i++) {
-                builders[i] = EnrichResultBuilder.enrichResultBuilder(mergingTypes[i], blockFactory, mergingChannels[i], positionCount);
+                builders[i] = EnrichResultBuilder.enrichResultBuilder(mergingTypes[i], blockFactory, mergingChannels[i]);
             }
         } finally {
             if (builders[builders.length - 1] == null) {
-                Releasables.close(builders);
+                Releasables.close(Releasables.wrap(builders));
             }
         }
+        selectedPositions.mustIncRef();
+        this.selectedPositions = selectedPositions;
     }
 
     @Override
@@ -102,7 +104,7 @@ final class MergePositionsOperator implements Operator {
         final Block[] blocks = new Block[builders.length];
         try {
             for (int i = 0; i < builders.length; i++) {
-                blocks[i] = builders[i].build();
+                blocks[i] = builders[i].build(selectedPositions);
             }
             outputPage = new Page(blocks);
         } finally {
@@ -127,7 +129,7 @@ final class MergePositionsOperator implements Operator {
 
     @Override
     public void close() {
-        Releasables.close(Releasables.wrap(builders), () -> {
+        Releasables.close(Releasables.wrap(builders), selectedPositions, () -> {
             if (outputPage != null) {
                 outputPage.releaseBlocks();
             }

+ 84 - 24
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/X-EnrichResultBuilder.java.st

@@ -18,10 +18,15 @@ import org.elasticsearch.common.util.ObjectArray;
 import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 $if(long)$
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.$Type$Block;
+$elseif(int)$
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
 $else$
 import org.elasticsearch.compute.data.$Type$Block;
+import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.compute.data.IntVector;
 $endif$
 import org.elasticsearch.compute.data.Page;
@@ -36,16 +41,17 @@ import java.util.Arrays;
 final class EnrichResultBuilderFor$Type$ extends EnrichResultBuilder {
 $if(BytesRef)$
     private final BytesRefArray bytes; // shared between all cells
+    private BytesRef scratch = new BytesRef();
 $endif$
     private ObjectArray<$if(BytesRef)$int$else$$type$$endif$[]> cells;
 
-    EnrichResultBuilderFor$Type$(BlockFactory blockFactory, int channel, int totalPositions) {
-        super(blockFactory, channel, totalPositions);
-        this.cells = blockFactory.bigArrays().newObjectArray(totalPositions);
+    EnrichResultBuilderFor$Type$(BlockFactory blockFactory, int channel) {
+        super(blockFactory, channel);
+        this.cells = blockFactory.bigArrays().newObjectArray(1);
 $if(BytesRef)$
         BytesRefArray bytes = null;
         try {
-            bytes = new BytesRefArray(totalPositions * 3L, blockFactory.bigArrays());
+            bytes = new BytesRefArray(1L, blockFactory.bigArrays());
             this.bytes = bytes;
         } finally {
             if (bytes == null) {
@@ -67,6 +73,7 @@ $endif$
                 continue;
             }
             int cellPosition = positions.getInt(i);
+            cells = blockFactory.bigArrays().grow(cells, cellPosition + 1);
             final var oldCell = cells.get(cellPosition);
             final var newCell = extendCell(oldCell, valueCount);
             cells.set(cellPosition, newCell);
@@ -96,37 +103,90 @@ $endif$
         }
     }
 
-    @Override
-    Block build() {
-        try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(totalPositions)) {
+    private $if(BytesRef)$int$else$$type$$endif$[] combineCell($if(BytesRef)$int$else$$type$$endif$[] first, $if(BytesRef)$int$else$$type$$endif$[] second) {
+        if (first == null) {
+            return second;
+        }
+        if (second == null) {
+            return first;
+        }
+        var result = new $if(BytesRef)$int$else$$type$$endif$[first.length + second.length];
+        System.arraycopy(first, 0, result, 0, first.length);
+        System.arraycopy(second, 0, result, first.length, second.length);
+        return result;
+    }
+
+    private void appendGroupToBlockBuilder($Type$Block.Builder builder, $if(BytesRef)$int$else$$type$$endif$[] group) {
+        if (group == null) {
+            builder.appendNull();
+        } else if (group.length == 1) {
 $if(BytesRef)$
-            BytesRef scratch = new BytesRef();
+            builder.appendBytesRef(bytes.get(group[0], scratch));
+$else$
+            builder.append$Type$(group[0]);
 $endif$
-            for (int i = 0; i < totalPositions; i++) {
-                final var cell = cells.get(i);
-                if (cell == null) {
-                    builder.appendNull();
-                    continue;
-                }
-                if (cell.length > 1) {
-                    builder.beginPositionEntry();
-                }
-                // TODO: sort and dedup
-                for (var v : cell) {
+        } else {
+            builder.beginPositionEntry();
+            // TODO: sort and dedup and set MvOrdering
+            for (var v : group) {
 $if(BytesRef)$
-                    builder.appendBytesRef(bytes.get(v, scratch));
+                builder.appendBytesRef(bytes.get(v, scratch));
 $else$
-                    builder.append$Type$(v);
+                builder.append$Type$(v);
 $endif$
-                }
-                if (cell.length > 1) {
-                    builder.endPositionEntry();
+            }
+            builder.endPositionEntry();
+        }
+    }
+
+    private $if(BytesRef)$int$else$$type$$endif$[] getCellOrNull(int position) {
+        return position < cells.size() ? cells.get(position) : null;
+    }
+
+    private Block buildWithSelected(IntBlock selected) {
+        try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                int selectedCount = selected.getValueCount(i);
+                switch (selectedCount) {
+                    case 0 -> builder.appendNull();
+                    case 1 -> {
+                        int groupId = selected.getInt(selected.getFirstValueIndex(i));
+                        appendGroupToBlockBuilder(builder, getCellOrNull(groupId));
+                    }
+                    default -> {
+                        int firstValueIndex = selected.getFirstValueIndex(i);
+                        var cell = getCellOrNull(selected.getInt(firstValueIndex));
+                        for (int p = 1; p < selectedCount; p++) {
+                            int groupId = selected.getInt(firstValueIndex + p);
+                            cell = combineCell(cell, getCellOrNull(groupId));
+                        }
+                        appendGroupToBlockBuilder(builder, cell);
+                    }
                 }
             }
             return builder.build();
         }
     }
 
+    private Block buildWithSelected(IntVector selected) {
+        try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) {
+            for (int i = 0; i < selected.getPositionCount(); i++) {
+                appendGroupToBlockBuilder(builder, getCellOrNull(selected.getInt(i)));
+            }
+            return builder.build();
+        }
+    }
+
+    @Override
+    Block build(IntBlock selected) {
+        var vector = selected.asVector();
+        if (vector != null) {
+            return buildWithSelected(vector);
+        } else {
+            return buildWithSelected(selected);
+        }
+    }
+
     @Override
     public void close() {
         Releasables.close($if(BytesRef)$bytes, $endif$cells, super::close);

+ 73 - 27
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichResultBuilderTests.java

@@ -15,6 +15,8 @@ import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.test.ESTestCase;
@@ -30,10 +32,10 @@ public class EnrichResultBuilderTests extends ESTestCase {
 
     public void testBytesRef() {
         BlockFactory blockFactory = blockFactory();
-        Map<Integer, List<BytesRef>> expectedValues = new HashMap<>();
+        Map<Integer, List<BytesRef>> inputValues = new HashMap<>();
         int numPages = between(0, 10);
         int maxPosition = between(0, 100);
-        var resultBuilder = EnrichResultBuilder.enrichResultBuilder(ElementType.BYTES_REF, blockFactory, 0, maxPosition + 1);
+        var resultBuilder = EnrichResultBuilder.enrichResultBuilder(ElementType.BYTES_REF, blockFactory, 0);
         for (int i = 0; i < numPages; i++) {
             int numRows = between(1, 100);
             try (
@@ -52,7 +54,7 @@ public class EnrichResultBuilderTests extends ESTestCase {
                     }
                     for (int v = 0; v < numValues; v++) {
                         BytesRef val = new BytesRef(randomByteArrayOfLength(10));
-                        expectedValues.computeIfAbsent(position, k -> new ArrayList<>()).add(val);
+                        inputValues.computeIfAbsent(position, k -> new ArrayList<>()).add(val);
                         valuesBuilder.appendBytesRef(val);
                     }
                     if (numValues > 1) {
@@ -64,18 +66,60 @@ public class EnrichResultBuilderTests extends ESTestCase {
                 }
             }
         }
-        try (BytesRefBlock actualOutput = (BytesRefBlock) resultBuilder.build()) {
-            assertThat(actualOutput.getPositionCount(), equalTo(maxPosition + 1));
-            for (int i = 0; i < actualOutput.getPositionCount(); i++) {
-                List<BytesRef> values = expectedValues.get(i);
-                if (actualOutput.isNull(i)) {
-                    assertNull(values);
+        try (IntVector selected = IntVector.range(0, maxPosition + 1, blockFactory)) {
+            try (BytesRefBlock actualOutput = (BytesRefBlock) resultBuilder.build(selected.asBlock())) {
+                assertThat(actualOutput.getPositionCount(), equalTo(maxPosition + 1));
+                for (int i = 0; i < actualOutput.getPositionCount(); i++) {
+                    List<BytesRef> values = inputValues.get(i);
+                    if (actualOutput.isNull(i)) {
+                        assertNull(values);
+                    } else {
+                        int valueCount = actualOutput.getValueCount(i);
+                        int first = actualOutput.getFirstValueIndex(i);
+                        assertThat(valueCount, equalTo(values.size()));
+                        for (int v = 0; v < valueCount; v++) {
+                            assertThat(actualOutput.getBytesRef(first + v, new BytesRef()), equalTo(values.get(v)));
+                        }
+                    }
+                }
+            }
+        }
+        try (IntBlock.Builder selectedBuilder = blockFactory.newIntBlockBuilder(between(1, 10))) {
+            int selectedPositions = between(1, 100);
+            Map<Integer, List<BytesRef>> expectedValues = new HashMap<>();
+            for (int i = 0; i < selectedPositions; i++) {
+                int ps = randomIntBetween(0, 3);
+                List<BytesRef> values = new ArrayList<>();
+                if (ps == 0) {
+                    selectedBuilder.appendNull();
                 } else {
-                    int valueCount = actualOutput.getValueCount(i);
-                    int first = actualOutput.getFirstValueIndex(i);
-                    assertThat(valueCount, equalTo(values.size()));
-                    for (int v = 0; v < valueCount; v++) {
-                        assertThat(actualOutput.getBytesRef(first + v, new BytesRef()), equalTo(values.get(v)));
+                    selectedBuilder.beginPositionEntry();
+                    for (int p = 0; p < ps; p++) {
+                        int position = randomIntBetween(0, maxPosition);
+                        selectedBuilder.appendInt(position);
+                        values.addAll(inputValues.getOrDefault(position, List.of()));
+                    }
+                    selectedBuilder.endPositionEntry();
+                }
+                if (values.isEmpty()) {
+                    expectedValues.put(i, null);
+                } else {
+                    expectedValues.put(i, values);
+                }
+            }
+            try (var selected = selectedBuilder.build(); BytesRefBlock actualOutput = (BytesRefBlock) resultBuilder.build(selected)) {
+                assertThat(actualOutput.getPositionCount(), equalTo(selected.getPositionCount()));
+                for (int i = 0; i < actualOutput.getPositionCount(); i++) {
+                    List<BytesRef> values = expectedValues.get(i);
+                    if (actualOutput.isNull(i)) {
+                        assertNull(values);
+                    } else {
+                        int valueCount = actualOutput.getValueCount(i);
+                        int first = actualOutput.getFirstValueIndex(i);
+                        assertThat(valueCount, equalTo(values.size()));
+                        for (int v = 0; v < valueCount; v++) {
+                            assertThat(actualOutput.getBytesRef(first + v, new BytesRef()), equalTo(values.get(v)));
+                        }
                     }
                 }
             }
@@ -89,7 +133,7 @@ public class EnrichResultBuilderTests extends ESTestCase {
         Map<Integer, List<Long>> expectedValues = new HashMap<>();
         int numPages = between(0, 10);
         int maxPosition = between(0, 100);
-        var resultBuilder = EnrichResultBuilder.enrichResultBuilder(ElementType.LONG, blockFactory, 0, maxPosition + 1);
+        var resultBuilder = EnrichResultBuilder.enrichResultBuilder(ElementType.LONG, blockFactory, 0);
         for (int i = 0; i < numPages; i++) {
             int numRows = between(1, 100);
             try (
@@ -120,18 +164,20 @@ public class EnrichResultBuilderTests extends ESTestCase {
                 }
             }
         }
-        try (LongBlock actualOutput = (LongBlock) resultBuilder.build()) {
-            assertThat(actualOutput.getPositionCount(), equalTo(maxPosition + 1));
-            for (int i = 0; i < actualOutput.getPositionCount(); i++) {
-                List<Long> values = expectedValues.get(i);
-                if (actualOutput.isNull(i)) {
-                    assertNull(values);
-                } else {
-                    int valueCount = actualOutput.getValueCount(i);
-                    int first = actualOutput.getFirstValueIndex(i);
-                    assertThat(valueCount, equalTo(values.size()));
-                    for (int v = 0; v < valueCount; v++) {
-                        assertThat(actualOutput.getLong(first + v), equalTo(values.get(v)));
+        try (IntVector selected = IntVector.range(0, maxPosition + 1, blockFactory)) {
+            try (LongBlock actualOutput = (LongBlock) resultBuilder.build(selected.asBlock())) {
+                assertThat(actualOutput.getPositionCount(), equalTo(maxPosition + 1));
+                for (int i = 0; i < actualOutput.getPositionCount(); i++) {
+                    List<Long> values = expectedValues.get(i);
+                    if (actualOutput.isNull(i)) {
+                        assertNull(values);
+                    } else {
+                        int valueCount = actualOutput.getValueCount(i);
+                        int first = actualOutput.getFirstValueIndex(i);
+                        assertThat(valueCount, equalTo(values.size()));
+                        for (int v = 0; v < valueCount; v++) {
+                            assertThat(actualOutput.getLong(first + v), equalTo(values.get(v)));
+                        }
                     }
                 }
             }

+ 5 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/MergePositionsOperatorTests.java

@@ -18,7 +18,9 @@ import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.compute.data.BytesRefBlock;
 import org.elasticsearch.compute.data.ElementType;
 import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.IntVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.test.ESTestCase;
 
 import java.util.List;
@@ -31,11 +33,12 @@ public class MergePositionsOperatorTests extends ESTestCase {
         BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking();
         CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
         BlockFactory blockFactory = new BlockFactory(breaker, bigArrays);
+        IntVector selected = IntVector.range(0, 7, blockFactory);
         MergePositionsOperator mergeOperator = new MergePositionsOperator(
-            7,
             0,
             new int[] { 1, 2 },
             new ElementType[] { ElementType.BYTES_REF, ElementType.INT },
+            selected.asBlock(),
             blockFactory
         );
         {
@@ -123,8 +126,7 @@ public class MergePositionsOperatorTests extends ESTestCase {
         assertTrue(f2.isNull(4));
         assertThat(BlockUtils.toJavaObject(f2, 5), equalTo(2023));
         assertTrue(f2.isNull(6));
-        mergeOperator.close();
-        out.releaseBlocks();
+        Releasables.close(mergeOperator, selected, out::releaseBlocks);
         MockBigArrays.ensureAllArraysAreReleased();
     }
 }