Browse Source

Implement `MV_DEDUPE` (ESQL-1287)

This implements the `MV_DEDUPE` function that removes duplicates from
multivalues fields. It wasn't strictly in our list of things we need in
the first release, but I'm grabbing this now because I realized I needed
very similar infrastructure when I was trying to build grouping by
multivalued fields. In fact, I realized that I could use our
stringtemplate code generation to generate most of the complex parts.
This generates the actual body of `MV_DEDUPE`'s implementation and the
body of the `Block` accepting `BlockHash` implementations. It'll be
useful in the final step for grouping by multivalued fields.

I also got pretty curious about whether the `O(n^2)` or `O(n*log(n))`
algorithm for deduplication is faster. I'd been assuming that for all
reasonable sized inputs the `O(n^2)` bubble sort looking selection
algorithm was faster. So I measured it. And it's mostly true - even for
`BytesRef` if you have a dozen entries the selection algorithm is
faster. Lower overhead and stuff. Anyway, to measure it I had to
implement the copy-and-sort `O(n*log(n))` algorithm. So while I was
there I plugged it in and selected it in cases where the number of
inputs is large and the selection alogorithm is likely to be slower.
Nik Everett 2 years ago
parent
commit
1a1941913d
42 changed files with 2335 additions and 330 deletions
  1. 1 1
      benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java
  2. 175 0
      benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/MultivalueDedupeBenchmark.java
  3. 2 0
      docs/reference/esql/esql-functions.asciidoc
  4. 14 0
      docs/reference/esql/functions/mv_dedupe.asciidoc
  5. 22 1
      x-pack/plugin/esql/compute/build.gradle
  6. 2 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java
  7. 2 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java
  8. 2 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java
  9. 2 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java
  10. 2 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java
  11. 266 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java
  12. 254 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeDouble.java
  13. 254 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeInt.java
  14. 254 0
      x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeLong.java
  15. 1 1
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java
  16. 9 60
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BooleanBlockHash.java
  17. 2 32
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java
  18. 2 42
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java
  19. 2 46
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java
  20. 2 40
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java
  21. 2 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st
  22. 128 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MultivalueDedupe.java
  23. 133 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MultivalueDedupeBoolean.java
  24. 352 0
      x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/X-MultivalueDedupe.java.st
  25. 2 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java
  26. 11 5
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java
  27. 9 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderAppendBlockTests.java
  28. 2 1
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java
  29. 6 12
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java
  30. 16 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockTestUtils.java
  31. 262 0
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MultivalueDedupeTests.java
  32. 2 2
      x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java
  33. 7 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec
  34. 7 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec
  35. 1 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec
  36. 13 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec
  37. 0 80
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToBooleanFromKeywordEvaluator.java
  38. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
  39. 50 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvDedupe.java
  40. 6 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  41. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlStatsAction.java
  42. 53 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvDedupeTests.java

+ 1 - 1
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java

@@ -60,7 +60,7 @@ import java.util.stream.Stream;
 @State(Scope.Thread)
 @Fork(1)
 public class AggregatorBenchmark {
-    private static final int BLOCK_LENGTH = 8 * 1024;
+    static final int BLOCK_LENGTH = 8 * 1024;
     private static final int OP_COUNT = 1024;
     private static final int GROUPS = 5;
 

+ 175 - 0
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/MultivalueDedupeBenchmark.java

@@ -0,0 +1,175 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.benchmark.compute.operator;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.Randomness;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.MultivalueDedupe;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+@Warmup(iterations = 3)
+@Measurement(iterations = 3)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@State(Scope.Thread)
+@Fork(1)
+public class MultivalueDedupeBenchmark {
+    @Param({ "BOOLEAN", "BYTES_REF", "DOUBLE", "INT", "LONG" })
+    private ElementType elementType;
+
+    @Param({ "3", "5", "10", "50", "100", "1000" })
+    private int size;
+
+    @Param({ "0", "2", "10", "100", "1000" })
+    private int repeats;
+
+    private Block block;
+
+    @Setup
+    public void setup() {
+        this.block = switch (elementType) {
+            case BOOLEAN -> {
+                BooleanBlock.Builder builder = BooleanBlock.newBlockBuilder(AggregatorBenchmark.BLOCK_LENGTH * (size + repeats));
+                for (int p = 0; p < AggregatorBenchmark.BLOCK_LENGTH; p++) {
+                    List<Boolean> values = new ArrayList<>();
+                    for (int i = 0; i < size; i++) {
+                        values.add(i % 2 == 0);
+                    }
+                    for (int r = 0; r < repeats; r++) {
+                        values.add(r < size ? r % 2 == 0 : false);
+                    }
+                    Randomness.shuffle(values);
+                    builder.beginPositionEntry();
+                    for (Boolean v : values) {
+                        builder.appendBoolean(v);
+                    }
+                    builder.endPositionEntry();
+                }
+                yield builder.build();
+            }
+            case BYTES_REF -> {
+                BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(AggregatorBenchmark.BLOCK_LENGTH * (size + repeats));
+                for (int p = 0; p < AggregatorBenchmark.BLOCK_LENGTH; p++) {
+                    List<BytesRef> values = new ArrayList<>();
+                    for (int i = 0; i < size; i++) {
+                        values.add(new BytesRef("SAFADFASDFSADFDAFS" + i));
+                    }
+                    for (int r = 0; r < repeats; r++) {
+                        values.add(new BytesRef("SAFADFASDFSADFDAFS" + ((r < size ? r : 0))));
+                    }
+                    Randomness.shuffle(values);
+                    builder.beginPositionEntry();
+                    for (BytesRef v : values) {
+                        builder.appendBytesRef(v);
+                    }
+                    builder.endPositionEntry();
+                }
+                yield builder.build();
+            }
+            case DOUBLE -> {
+                DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(AggregatorBenchmark.BLOCK_LENGTH * (size + repeats));
+                for (int p = 0; p < AggregatorBenchmark.BLOCK_LENGTH; p++) {
+                    List<Double> values = new ArrayList<>();
+                    for (int i = 0; i < size; i++) {
+                        values.add((double) i);
+                    }
+                    for (int r = 0; r < repeats; r++) {
+                        values.add(r < size ? (double) r : 0.0);
+                    }
+                    Randomness.shuffle(values);
+                    builder.beginPositionEntry();
+                    for (Double v : values) {
+                        builder.appendDouble(v);
+                    }
+                    builder.endPositionEntry();
+                }
+                yield builder.build();
+            }
+            case INT -> {
+                IntBlock.Builder builder = IntBlock.newBlockBuilder(AggregatorBenchmark.BLOCK_LENGTH * (size + repeats));
+                for (int p = 0; p < AggregatorBenchmark.BLOCK_LENGTH; p++) {
+                    List<Integer> values = new ArrayList<>();
+                    for (int i = 0; i < size; i++) {
+                        values.add(i);
+                    }
+                    for (int r = 0; r < repeats; r++) {
+                        values.add(r < size ? r : 0);
+                    }
+                    Randomness.shuffle(values);
+                    builder.beginPositionEntry();
+                    for (Integer v : values) {
+                        builder.appendInt(v);
+                    }
+                    builder.endPositionEntry();
+                }
+                yield builder.build();
+            }
+            case LONG -> {
+                LongBlock.Builder builder = LongBlock.newBlockBuilder(AggregatorBenchmark.BLOCK_LENGTH * (size + repeats));
+                for (int p = 0; p < AggregatorBenchmark.BLOCK_LENGTH; p++) {
+                    List<Long> values = new ArrayList<>();
+                    for (long i = 0; i < size; i++) {
+                        values.add(i);
+                    }
+                    for (int r = 0; r < repeats; r++) {
+                        values.add(r < size ? r : 0L);
+                    }
+                    Randomness.shuffle(values);
+                    builder.beginPositionEntry();
+                    for (Long v : values) {
+                        builder.appendLong(v);
+                    }
+                    builder.endPositionEntry();
+                }
+                yield builder.build();
+            }
+            default -> throw new IllegalArgumentException();
+        };
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(AggregatorBenchmark.BLOCK_LENGTH)
+    public Block adaptive() {
+        return MultivalueDedupe.dedupeToBlockAdaptive(block);
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(AggregatorBenchmark.BLOCK_LENGTH)
+    public Block copyAndSort() {
+        return MultivalueDedupe.dedupeToBlockUsingCopyAndSort(block);
+    }
+
+    @Benchmark
+    @OperationsPerInvocation(AggregatorBenchmark.BLOCK_LENGTH)
+    public Block copyMissing() {
+        return MultivalueDedupe.dedupeToBlockUsingCopyMissing(block);
+    }
+}

+ 2 - 0
docs/reference/esql/esql-functions.asciidoc

@@ -26,6 +26,7 @@ these functions:
 * <<esql-mv_avg>>
 * <<esql-mv_concat>>
 * <<esql-mv_count>>
+* <<esql-mv_dedupe>>
 * <<esql-mv_max>>
 * <<esql-mv_median>>
 * <<esql-mv_min>>
@@ -61,6 +62,7 @@ include::functions/length.asciidoc[]
 include::functions/mv_avg.asciidoc[]
 include::functions/mv_concat.asciidoc[]
 include::functions/mv_count.asciidoc[]
+include::functions/mv_dedupe.asciidoc[]
 include::functions/mv_max.asciidoc[]
 include::functions/mv_median.asciidoc[]
 include::functions/mv_min.asciidoc[]

+ 14 - 0
docs/reference/esql/functions/mv_dedupe.asciidoc

@@ -0,0 +1,14 @@
+[[esql-mv_dedupe]]
+=== `MV_DEDUPE`
+Removes duplicates from a multivalued field. For example:
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/string.csv-spec[tag=mv_dedupe]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/string.csv-spec[tag=mv_dedupe-result]
+|===
+
+NOTE: `MV_DEDUPE` may, but won't always, sort the values in the field.

+ 22 - 1
x-pack/plugin/esql/compute/build.gradle

@@ -48,7 +48,7 @@ tasks.named('stringTemplates').configure {
   var intProperties      = prop("Int", "int", "INT", "Integer.BYTES")
   var longProperties     = prop("Long", "long", "LONG", "Long.BYTES")
   var doubleProperties   = prop("Double", "double", "DOUBLE", "Double.BYTES")
-  var bytesRefProperties = prop("BytesRef", "BytesRef", "BYTES_REF", "BytesRef.BYTES")
+  var bytesRefProperties = prop("BytesRef", "BytesRef", "BYTES_REF", "org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF")
   var booleanProperties  = prop("Boolean", "boolean", "BOOLEAN", "Boolean.BYTES")
   // primitive vectors
   File vectorInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/data/X-Vector.java.st")
@@ -352,4 +352,25 @@ tasks.named('stringTemplates').configure {
     it.inputFile =  arrayStateInputFile
     it.outputFile = "org/elasticsearch/compute/aggregation/DoubleArrayState.java"
   }
+  File multivalueDedupeInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/operator/X-MultivalueDedupe.java.st")
+  template {
+    it.properties = intProperties
+    it.inputFile =  multivalueDedupeInputFile
+    it.outputFile = "org/elasticsearch/compute/operator/MultivalueDedupeInt.java"
+  }
+  template {
+    it.properties = longProperties
+    it.inputFile =  multivalueDedupeInputFile
+    it.outputFile = "org/elasticsearch/compute/operator/MultivalueDedupeLong.java"
+  }
+  template {
+    it.properties = doubleProperties
+    it.inputFile =  multivalueDedupeInputFile
+    it.outputFile = "org/elasticsearch/compute/operator/MultivalueDedupeDouble.java"
+  }
+  template {
+    it.properties = bytesRefProperties
+    it.inputFile =  multivalueDedupeInputFile
+    it.outputFile = "org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java"
+  }
 }

+ 2 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java

@@ -183,6 +183,8 @@ public sealed interface BooleanBlock extends Block permits FilterBooleanBlock, B
         @Override
         Builder mvOrdering(Block.MvOrdering mvOrdering);
 
+        // TODO boolean containsMvDups();
+
         /**
          * Appends the all values of the given block into a the current position
          * in this builder.

+ 2 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java

@@ -186,6 +186,8 @@ public sealed interface BytesRefBlock extends Block permits FilterBytesRefBlock,
         @Override
         Builder mvOrdering(Block.MvOrdering mvOrdering);
 
+        // TODO boolean containsMvDups();
+
         /**
          * Appends the all values of the given block into a the current position
          * in this builder.

+ 2 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java

@@ -184,6 +184,8 @@ public sealed interface DoubleBlock extends Block permits FilterDoubleBlock, Dou
         @Override
         Builder mvOrdering(Block.MvOrdering mvOrdering);
 
+        // TODO boolean containsMvDups();
+
         /**
          * Appends the all values of the given block into a the current position
          * in this builder.

+ 2 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java

@@ -183,6 +183,8 @@ public sealed interface IntBlock extends Block permits FilterIntBlock, IntArrayB
         @Override
         Builder mvOrdering(Block.MvOrdering mvOrdering);
 
+        // TODO boolean containsMvDups();
+
         /**
          * Appends the all values of the given block into a the current position
          * in this builder.

+ 2 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java

@@ -184,6 +184,8 @@ public sealed interface LongBlock extends Block permits FilterLongBlock, LongArr
         @Override
         Builder mvOrdering(Block.MvOrdering mvOrdering);
 
+        // TODO boolean containsMvDups();
+
         /**
          * Appends the all values of the given block into a the current position
          * in this builder.

+ 266 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeBytesRef.java

@@ -0,0 +1,266 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.util.BytesRefHash;
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.LongBlock;
+
+import java.util.Arrays;
+
+/**
+ * Removes duplicate values from multivalued positions.
+ * This class is generated. Edit {@code X-MultivalueDedupe.java.st} instead.
+ */
+public class MultivalueDedupeBytesRef {
+    /**
+     * The number of entries before we switch from and {@code n^2} strategy
+     * with low overhead to an {@code n*log(n)} strategy with higher overhead.
+     * The choice of number has been experimentally derived.
+     */
+    private static final int ALWAYS_COPY_MISSING = 20;  // TODO BytesRef should try adding to the hash *first* and then comparing.
+    private final BytesRefBlock block;
+    private BytesRef[] work = new BytesRef[ArrayUtil.oversize(2, org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF)];
+    private int w;
+
+    public MultivalueDedupeBytesRef(BytesRefBlock block) {
+        this.block = block;
+        // TODO very large numbers might want a hash based implementation - and for BytesRef that might not be that big
+        fillWork(0, work.length);
+    }
+
+    /**
+     * Dedupe values using an adaptive algorithm based on the size of the input list.
+     */
+    public BytesRefBlock dedupeToBlockAdaptive() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendBytesRef(block.getBytesRef(first, work[0]));
+                default -> {
+                    /*
+                     * It's better to copyMissing when there are few unique values
+                     * and better to copy and sort when there are many unique values.
+                     * The more duplicate values there are the more comparatively worse
+                     * copyAndSort is. But we don't know how many unique values there
+                     * because our job is to find them. So we use the count of values
+                     * as a proxy that is fast to test. It's not always going to be
+                     * optimal but it has the nice property of being quite quick on
+                     * short lists and not n^2 levels of terrible on long ones.
+                     *
+                     * It'd also be possible to make a truly hybrid mechanism that
+                     * switches from copyMissing to copyUnique once it collects enough
+                     * unique values. The trouble is that the switch is expensive and
+                     * makes kind of a "hole" in the performance of that mechanism where
+                     * you may as well have just gone with either of the two other
+                     * strategies. So we just don't try it for now.
+                     */
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        writeUniquedWork(builder);
+                    } else {
+                        copyAndSort(first, count);
+                        writeSortedWork(builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n*log(n)} strategy with higher overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public BytesRefBlock dedupeToBlockUsingCopyAndSort() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendBytesRef(block.getBytesRef(first, work[0]));
+                default -> {
+                    copyAndSort(first, count);
+                    writeSortedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n^2} strategy with low overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public BytesRefBlock dedupeToBlockUsingCopyMissing() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        BytesRefBlock.Builder builder = BytesRefBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendBytesRef(block.getBytesRef(first, work[0]));
+                default -> {
+                    copyMissing(first, count);
+                    writeUniquedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values and build a {@link LongBlock} suitable for passing
+     * as the grouping block to a {@link GroupingAggregatorFunction}.
+     */
+    public LongBlock hash(BytesRefHash hash) {
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> {
+                    BytesRef v = block.getBytesRef(first, work[0]);
+                    hash(builder, hash, v);
+                }
+                default -> {
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        hashUniquedWork(hash, builder);
+                    } else {
+                        copyAndSort(first, count);
+                        hashSortedWork(hash, builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    private void copyAndSort(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        w = 0;
+        for (int i = first; i < end; i++) {
+            work[w] = block.getBytesRef(i, work[w]);
+            w++;
+        }
+
+        Arrays.sort(work, 0, w);
+    }
+
+    private void copyMissing(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        work[0] = block.getBytesRef(first, work[0]);
+        w = 1;
+        i: for (int i = first + 1; i < end; i++) {
+            BytesRef v = block.getBytesRef(i, work[w]);
+            for (int j = 0; j < w; j++) {
+                if (v.equals(work[j])) {
+                    continue i;
+                }
+            }
+            work[w++] = v;
+        }
+    }
+
+    private void writeUniquedWork(BytesRefBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendBytesRef(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            builder.appendBytesRef(work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void writeSortedWork(BytesRefBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendBytesRef(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        BytesRef prev = work[0];
+        builder.appendBytesRef(prev);
+        for (int i = 1; i < w; i++) {
+            if (false == prev.equals(work[i])) {
+                prev = work[i];
+                builder.appendBytesRef(prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashUniquedWork(BytesRefHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            hash(builder, hash, work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashSortedWork(BytesRefHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        BytesRef prev = work[0];
+        hash(builder, hash, prev);
+        for (int i = 1; i < w; i++) {
+            if (false == prev.equals(work[i])) {
+                prev = work[i];
+                hash(builder, hash, prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void grow(int size) {
+        int prev = work.length;
+        work = ArrayUtil.grow(work, size);
+        fillWork(prev, work.length);
+    }
+
+    private void fillWork(int from, int to) {
+        for (int i = from; i < to; i++) {
+            work[i] = new BytesRef();
+        }
+    }
+
+    private void hash(LongBlock.Builder builder, BytesRefHash hash, BytesRef v) {
+        builder.appendLong(BlockHash.hashOrdToGroup(hash.add(v)));
+    }
+}

+ 254 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeDouble.java

@@ -0,0 +1,254 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.elasticsearch.common.util.LongHash;
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.LongBlock;
+
+import java.util.Arrays;
+
+/**
+ * Removes duplicate values from multivalued positions.
+ * This class is generated. Edit {@code X-MultivalueDedupe.java.st} instead.
+ */
+public class MultivalueDedupeDouble {
+    /**
+     * The number of entries before we switch from and {@code n^2} strategy
+     * with low overhead to an {@code n*log(n)} strategy with higher overhead.
+     * The choice of number has been experimentally derived.
+     */
+    private static final int ALWAYS_COPY_MISSING = 110;
+    private final DoubleBlock block;
+    private double[] work = new double[ArrayUtil.oversize(2, Double.BYTES)];
+    private int w;
+
+    public MultivalueDedupeDouble(DoubleBlock block) {
+        this.block = block;
+    }
+
+    /**
+     * Dedupe values using an adaptive algorithm based on the size of the input list.
+     */
+    public DoubleBlock dedupeToBlockAdaptive() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendDouble(block.getDouble(first));
+                default -> {
+                    /*
+                     * It's better to copyMissing when there are few unique values
+                     * and better to copy and sort when there are many unique values.
+                     * The more duplicate values there are the more comparatively worse
+                     * copyAndSort is. But we don't know how many unique values there
+                     * because our job is to find them. So we use the count of values
+                     * as a proxy that is fast to test. It's not always going to be
+                     * optimal but it has the nice property of being quite quick on
+                     * short lists and not n^2 levels of terrible on long ones.
+                     *
+                     * It'd also be possible to make a truly hybrid mechanism that
+                     * switches from copyMissing to copyUnique once it collects enough
+                     * unique values. The trouble is that the switch is expensive and
+                     * makes kind of a "hole" in the performance of that mechanism where
+                     * you may as well have just gone with either of the two other
+                     * strategies. So we just don't try it for now.
+                     */
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        writeUniquedWork(builder);
+                    } else {
+                        copyAndSort(first, count);
+                        writeSortedWork(builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n*log(n)} strategy with higher overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public DoubleBlock dedupeToBlockUsingCopyAndSort() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendDouble(block.getDouble(first));
+                default -> {
+                    copyAndSort(first, count);
+                    writeSortedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n^2} strategy with low overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public DoubleBlock dedupeToBlockUsingCopyMissing() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        DoubleBlock.Builder builder = DoubleBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendDouble(block.getDouble(first));
+                default -> {
+                    copyMissing(first, count);
+                    writeUniquedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values and build a {@link LongBlock} suitable for passing
+     * as the grouping block to a {@link GroupingAggregatorFunction}.
+     */
+    public LongBlock hash(LongHash hash) {
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> {
+                    double v = block.getDouble(first);
+                    hash(builder, hash, v);
+                }
+                default -> {
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        hashUniquedWork(hash, builder);
+                    } else {
+                        copyAndSort(first, count);
+                        hashSortedWork(hash, builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    private void copyAndSort(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        w = 0;
+        for (int i = first; i < end; i++) {
+            work[w++] = block.getDouble(i);
+        }
+
+        Arrays.sort(work, 0, w);
+    }
+
+    private void copyMissing(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        work[0] = block.getDouble(first);
+        w = 1;
+        i: for (int i = first + 1; i < end; i++) {
+            double v = block.getDouble(i);
+            for (int j = 0; j < w; j++) {
+                if (v == work[j]) {
+                    continue i;
+                }
+            }
+            work[w++] = v;
+        }
+    }
+
+    private void writeUniquedWork(DoubleBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendDouble(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            builder.appendDouble(work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void writeSortedWork(DoubleBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendDouble(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        double prev = work[0];
+        builder.appendDouble(prev);
+        for (int i = 1; i < w; i++) {
+            if (prev != work[i]) {
+                prev = work[i];
+                builder.appendDouble(prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            hash(builder, hash, work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashSortedWork(LongHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        double prev = work[0];
+        hash(builder, hash, prev);
+        for (int i = 1; i < w; i++) {
+            if (prev != work[i]) {
+                prev = work[i];
+                hash(builder, hash, prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void grow(int size) {
+        work = ArrayUtil.grow(work, size);
+    }
+
+    private void hash(LongBlock.Builder builder, LongHash hash, double v) {
+        builder.appendLong(BlockHash.hashOrdToGroup(hash.add(Double.doubleToLongBits(v))));
+    }
+}

+ 254 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeInt.java

@@ -0,0 +1,254 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.elasticsearch.common.util.LongHash;
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+
+import java.util.Arrays;
+
+/**
+ * Removes duplicate values from multivalued positions.
+ * This class is generated. Edit {@code X-MultivalueDedupe.java.st} instead.
+ */
+public class MultivalueDedupeInt {
+    /**
+     * The number of entries before we switch from and {@code n^2} strategy
+     * with low overhead to an {@code n*log(n)} strategy with higher overhead.
+     * The choice of number has been experimentally derived.
+     */
+    private static final int ALWAYS_COPY_MISSING = 300;
+    private final IntBlock block;
+    private int[] work = new int[ArrayUtil.oversize(2, Integer.BYTES)];
+    private int w;
+
+    public MultivalueDedupeInt(IntBlock block) {
+        this.block = block;
+    }
+
+    /**
+     * Dedupe values using an adaptive algorithm based on the size of the input list.
+     */
+    public IntBlock dedupeToBlockAdaptive() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        IntBlock.Builder builder = IntBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendInt(block.getInt(first));
+                default -> {
+                    /*
+                     * It's better to copyMissing when there are few unique values
+                     * and better to copy and sort when there are many unique values.
+                     * The more duplicate values there are the more comparatively worse
+                     * copyAndSort is. But we don't know how many unique values there
+                     * because our job is to find them. So we use the count of values
+                     * as a proxy that is fast to test. It's not always going to be
+                     * optimal but it has the nice property of being quite quick on
+                     * short lists and not n^2 levels of terrible on long ones.
+                     *
+                     * It'd also be possible to make a truly hybrid mechanism that
+                     * switches from copyMissing to copyUnique once it collects enough
+                     * unique values. The trouble is that the switch is expensive and
+                     * makes kind of a "hole" in the performance of that mechanism where
+                     * you may as well have just gone with either of the two other
+                     * strategies. So we just don't try it for now.
+                     */
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        writeUniquedWork(builder);
+                    } else {
+                        copyAndSort(first, count);
+                        writeSortedWork(builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n*log(n)} strategy with higher overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public IntBlock dedupeToBlockUsingCopyAndSort() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        IntBlock.Builder builder = IntBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendInt(block.getInt(first));
+                default -> {
+                    copyAndSort(first, count);
+                    writeSortedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n^2} strategy with low overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public IntBlock dedupeToBlockUsingCopyMissing() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        IntBlock.Builder builder = IntBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendInt(block.getInt(first));
+                default -> {
+                    copyMissing(first, count);
+                    writeUniquedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values and build a {@link LongBlock} suitable for passing
+     * as the grouping block to a {@link GroupingAggregatorFunction}.
+     */
+    public LongBlock hash(LongHash hash) {
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> {
+                    int v = block.getInt(first);
+                    hash(builder, hash, v);
+                }
+                default -> {
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        hashUniquedWork(hash, builder);
+                    } else {
+                        copyAndSort(first, count);
+                        hashSortedWork(hash, builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    private void copyAndSort(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        w = 0;
+        for (int i = first; i < end; i++) {
+            work[w++] = block.getInt(i);
+        }
+
+        Arrays.sort(work, 0, w);
+    }
+
+    private void copyMissing(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        work[0] = block.getInt(first);
+        w = 1;
+        i: for (int i = first + 1; i < end; i++) {
+            int v = block.getInt(i);
+            for (int j = 0; j < w; j++) {
+                if (v == work[j]) {
+                    continue i;
+                }
+            }
+            work[w++] = v;
+        }
+    }
+
+    private void writeUniquedWork(IntBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendInt(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            builder.appendInt(work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void writeSortedWork(IntBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendInt(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        int prev = work[0];
+        builder.appendInt(prev);
+        for (int i = 1; i < w; i++) {
+            if (prev != work[i]) {
+                prev = work[i];
+                builder.appendInt(prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            hash(builder, hash, work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashSortedWork(LongHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        int prev = work[0];
+        hash(builder, hash, prev);
+        for (int i = 1; i < w; i++) {
+            if (prev != work[i]) {
+                prev = work[i];
+                hash(builder, hash, prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void grow(int size) {
+        work = ArrayUtil.grow(work, size);
+    }
+
+    private void hash(LongBlock.Builder builder, LongHash hash, int v) {
+        builder.appendLong(BlockHash.hashOrdToGroup(hash.add(v)));
+    }
+}

+ 254 - 0
x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/operator/MultivalueDedupeLong.java

@@ -0,0 +1,254 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.elasticsearch.common.util.LongHash;
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+import org.elasticsearch.compute.data.LongBlock;
+
+import java.util.Arrays;
+
+/**
+ * Removes duplicate values from multivalued positions.
+ * This class is generated. Edit {@code X-MultivalueDedupe.java.st} instead.
+ */
+public class MultivalueDedupeLong {
+    /**
+     * The number of entries before we switch from and {@code n^2} strategy
+     * with low overhead to an {@code n*log(n)} strategy with higher overhead.
+     * The choice of number has been experimentally derived.
+     */
+    private static final int ALWAYS_COPY_MISSING = 300;
+
+    private final LongBlock block;
+    private long[] work = new long[ArrayUtil.oversize(2, Long.BYTES)];
+    private int w;
+
+    public MultivalueDedupeLong(LongBlock block) {
+        this.block = block;
+    }
+
+    /**
+     * Dedupe values using an adaptive algorithm based on the size of the input list.
+     */
+    public LongBlock dedupeToBlockAdaptive() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendLong(block.getLong(first));
+                default -> {
+                    /*
+                     * It's better to copyMissing when there are few unique values
+                     * and better to copy and sort when there are many unique values.
+                     * The more duplicate values there are the more comparatively worse
+                     * copyAndSort is. But we don't know how many unique values there
+                     * because our job is to find them. So we use the count of values
+                     * as a proxy that is fast to test. It's not always going to be
+                     * optimal but it has the nice property of being quite quick on
+                     * short lists and not n^2 levels of terrible on long ones.
+                     *
+                     * It'd also be possible to make a truly hybrid mechanism that
+                     * switches from copyMissing to copyUnique once it collects enough
+                     * unique values. The trouble is that the switch is expensive and
+                     * makes kind of a "hole" in the performance of that mechanism where
+                     * you may as well have just gone with either of the two other
+                     * strategies. So we just don't try it for now.
+                     */
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        writeUniquedWork(builder);
+                    } else {
+                        copyAndSort(first, count);
+                        writeSortedWork(builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n*log(n)} strategy with higher overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public LongBlock dedupeToBlockUsingCopyAndSort() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendLong(block.getLong(first));
+                default -> {
+                    copyAndSort(first, count);
+                    writeSortedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n^2} strategy with low overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public LongBlock dedupeToBlockUsingCopyMissing() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendLong(block.getLong(first));
+                default -> {
+                    copyMissing(first, count);
+                    writeUniquedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values and build a {@link LongBlock} suitable for passing
+     * as the grouping block to a {@link GroupingAggregatorFunction}.
+     */
+    public LongBlock hash(LongHash hash) {
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> {
+                    long v = block.getLong(first);
+                    hash(builder, hash, v);
+                }
+                default -> {
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        hashUniquedWork(hash, builder);
+                    } else {
+                        copyAndSort(first, count);
+                        hashSortedWork(hash, builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    private void copyAndSort(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        w = 0;
+        for (int i = first; i < end; i++) {
+            work[w++] = block.getLong(i);
+        }
+
+        Arrays.sort(work, 0, w);
+    }
+
+    private void copyMissing(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        work[0] = block.getLong(first);
+        w = 1;
+        i: for (int i = first + 1; i < end; i++) {
+            long v = block.getLong(i);
+            for (int j = 0; j < w; j++) {
+                if (v == work[j]) {
+                    continue i;
+                }
+            }
+            work[w++] = v;
+        }
+    }
+
+    private void writeUniquedWork(LongBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendLong(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            builder.appendLong(work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void writeSortedWork(LongBlock.Builder builder) {
+        if (w == 1) {
+            builder.appendLong(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        long prev = work[0];
+        builder.appendLong(prev);
+        for (int i = 1; i < w; i++) {
+            if (prev != work[i]) {
+                prev = work[i];
+                builder.appendLong(prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            hash(builder, hash, work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void hashSortedWork(LongHash hash, LongBlock.Builder builder) {
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        long prev = work[0];
+        hash(builder, hash, prev);
+        for (int i = 1; i < w; i++) {
+            if (prev != work[i]) {
+                prev = work[i];
+                hash(builder, hash, prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void grow(int size) {
+        work = ArrayUtil.grow(work, size);
+    }
+
+    private void hash(LongBlock.Builder builder, LongHash hash, long v) {
+        builder.appendLong(BlockHash.hashOrdToGroup(hash.add(v)));
+    }
+}

+ 1 - 1
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java

@@ -87,7 +87,7 @@ public abstract sealed class BlockHash implements Releasable //
         };
     }
 
-    protected static long hashOrdToGroup(long ord) {
+    public static long hashOrdToGroup(long ord) {
         if (ord < 0) { // already seen
             return -1 - ord;
         }

+ 9 - 60
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BooleanBlockHash.java

@@ -14,6 +14,7 @@ import org.elasticsearch.compute.data.LongArrayVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.MultivalueDedupeBoolean;
 
 /**
  * Assigns group {@code 0} to the first of {@code true} or{@code false}
@@ -21,9 +22,7 @@ import org.elasticsearch.compute.data.Page;
  */
 final class BooleanBlockHash extends BlockHash {
     private final int channel;
-
-    private boolean seenFalse;
-    private boolean seenTrue;
+    private final boolean[] everSeen = new boolean[2];
 
     BooleanBlockHash(int channel) {
         this.channel = channel;
@@ -42,72 +41,22 @@ final class BooleanBlockHash extends BlockHash {
     private LongVector add(BooleanVector vector) {
         long[] groups = new long[vector.getPositionCount()];
         for (int i = 0; i < vector.getPositionCount(); i++) {
-            groups[i] = ord(vector.getBoolean(i));
+            groups[i] = MultivalueDedupeBoolean.hashOrd(everSeen, vector.getBoolean(i));
         }
         return new LongArrayVector(groups, groups.length);
     }
 
     private LongBlock add(BooleanBlock block) {
-        boolean seenTrueThisPosition = false;
-        boolean seenFalseThisPosition = false;
-        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getTotalValueCount());
-        for (int p = 0; p < block.getPositionCount(); p++) {
-            if (block.isNull(p)) {
-                builder.appendNull();
-                continue;
-            }
-            int start = block.getFirstValueIndex(p);
-            int count = block.getValueCount(p);
-            if (count == 1) {
-                builder.appendLong(ord(block.getBoolean(start)));
-                continue;
-            }
-            seenTrueThisPosition = false;
-            seenFalseThisPosition = false;
-            builder.beginPositionEntry();
-            int end = start + count;
-            for (int offset = start; offset < end; offset++) {
-                if (block.getBoolean(offset)) {
-                    if (false == seenTrueThisPosition) {
-                        builder.appendLong(1);
-                        seenTrueThisPosition = true;
-                        seenTrue = true;
-                        if (seenFalseThisPosition) {
-                            break;
-                        }
-                    }
-                } else {
-                    if (false == seenFalseThisPosition) {
-                        builder.appendLong(0);
-                        seenFalseThisPosition = true;
-                        seenFalse = true;
-                        if (seenTrueThisPosition) {
-                            break;
-                        }
-                    }
-                }
-            }
-            builder.endPositionEntry();
-        }
-        return builder.build();
-    }
-
-    private long ord(boolean b) {
-        if (b) {
-            seenTrue = true;
-            return 1;
-        }
-        seenFalse = true;
-        return 0;
+        return new MultivalueDedupeBoolean(block).hash(everSeen);
     }
 
     @Override
     public BooleanBlock[] getKeys() {
         BooleanVector.Builder builder = BooleanVector.newVectorBuilder(2);
-        if (seenFalse) {
+        if (everSeen[0]) {
             builder.appendBoolean(false);
         }
-        if (seenTrue) {
+        if (everSeen[1]) {
             builder.appendBoolean(true);
         }
         return new BooleanBlock[] { builder.build().asBlock() };
@@ -116,10 +65,10 @@ final class BooleanBlockHash extends BlockHash {
     @Override
     public IntVector nonEmpty() {
         IntVector.Builder builder = IntVector.newVectorBuilder(2);
-        if (seenFalse) {
+        if (everSeen[0]) {
             builder.appendInt(0);
         }
-        if (seenTrue) {
+        if (everSeen[1]) {
             builder.appendInt(1);
         }
         return builder.build();
@@ -132,6 +81,6 @@ final class BooleanBlockHash extends BlockHash {
 
     @Override
     public String toString() {
-        return "BooleanBlockHash{channel=" + channel + ", seenFalse=" + seenFalse + ", seenTrue=" + seenTrue + '}';
+        return "BooleanBlockHash{channel=" + channel + ", seenFalse=" + everSeen[0] + ", seenTrue=" + everSeen[1] + '}';
     }
 }

+ 2 - 32
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BytesRefBlockHash.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.compute.aggregation.blockhash;
 
-import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -23,6 +22,7 @@ import org.elasticsearch.compute.data.LongArrayVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.MultivalueDedupeBytesRef;
 
 import java.io.IOException;
 
@@ -54,38 +54,8 @@ final class BytesRefBlockHash extends BlockHash {
         return new LongArrayVector(groups, vector.getPositionCount());
     }
 
-    private static final long[] EMPTY = new long[0];
-
     private LongBlock add(BytesRefBlock block) {
-        long[] seen = EMPTY;
-        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getTotalValueCount());
-        for (int p = 0; p < block.getPositionCount(); p++) {
-            if (block.isNull(p)) {
-                builder.appendNull();
-                continue;
-            }
-            int start = block.getFirstValueIndex(p);
-            int count = block.getValueCount(p);
-            if (count == 1) {
-                builder.appendLong(hashOrdToGroup(bytesRefHash.add(block.getBytesRef(start, bytes))));
-                continue;
-            }
-            if (seen.length < count) {
-                seen = new long[ArrayUtil.oversize(count, Long.BYTES)];
-            }
-            builder.beginPositionEntry();
-            // TODO if we know the elements were in sorted order we wouldn't need an array at all.
-            // TODO we could also have an assertion that there aren't any duplicates on the block.
-            // Lucene has them in ascending order without duplicates
-            int end = start + count;
-            int nextSeen = 0;
-            for (int offset = start; offset < end; offset++) {
-                long ord = bytesRefHash.add(block.getBytesRef(offset, bytes));
-                nextSeen = addOrd(builder, seen, nextSeen, ord);
-            }
-            builder.endPositionEntry();
-        }
-        return builder.build();
+        return new MultivalueDedupeBytesRef(block).hash(bytesRefHash);
     }
 
     protected static int addOrd(LongBlock.Builder builder, long[] seen, int nextSeen, long ord) {

+ 2 - 42
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/DoubleBlockHash.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.compute.aggregation.blockhash;
 
-import org.apache.lucene.util.ArrayUtil;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongHash;
 import org.elasticsearch.compute.data.DoubleArrayVector;
@@ -18,6 +17,7 @@ import org.elasticsearch.compute.data.LongArrayVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.MultivalueDedupeDouble;
 
 final class DoubleBlockHash extends BlockHash {
     private final int channel;
@@ -46,48 +46,8 @@ final class DoubleBlockHash extends BlockHash {
         return new LongArrayVector(groups, groups.length);
     }
 
-    private static final double[] EMPTY = new double[0];
-
     private LongBlock add(DoubleBlock block) {
-        double[] seen = EMPTY;
-        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getTotalValueCount());
-        for (int p = 0; p < block.getPositionCount(); p++) {
-            if (block.isNull(p)) {
-                builder.appendNull();
-                continue;
-            }
-            int start = block.getFirstValueIndex(p);
-            int count = block.getValueCount(p);
-            if (count == 1) {
-                builder.appendLong(hashOrdToGroup(longHash.add(Double.doubleToLongBits(block.getDouble(start)))));
-                continue;
-            }
-            if (seen.length < count) {
-                seen = new double[ArrayUtil.oversize(count, Double.BYTES)];
-            }
-            builder.beginPositionEntry();
-            // TODO if we know the elements were in sorted order we wouldn't need an array at all.
-            // TODO we could also have an assertion that there aren't any duplicates on the block.
-            // Lucene has them in ascending order without duplicates
-            int end = start + count;
-            int nextSeen = 0;
-            for (int offset = start; offset < end; offset++) {
-                nextSeen = add(builder, seen, nextSeen, block.getDouble(offset));
-            }
-            builder.endPositionEntry();
-        }
-        return builder.build();
-    }
-
-    protected int add(LongBlock.Builder builder, double[] seen, int nextSeen, double value) {
-        for (int j = 0; j < nextSeen; j++) {
-            if (seen[j] == value) {
-                return nextSeen;
-            }
-        }
-        seen[nextSeen] = value;
-        builder.appendLong(hashOrdToGroup(longHash.add(Double.doubleToLongBits(value))));
-        return nextSeen + 1;
+        return new MultivalueDedupeDouble(block).hash(longHash);
     }
 
     @Override

+ 2 - 46
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/IntBlockHash.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.compute.aggregation.blockhash;
 
-import org.apache.lucene.util.ArrayUtil;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongHash;
 import org.elasticsearch.compute.data.IntArrayVector;
@@ -17,6 +16,7 @@ import org.elasticsearch.compute.data.LongArrayVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.MultivalueDedupeInt;
 
 final class IntBlockHash extends BlockHash {
     private final int channel;
@@ -45,52 +45,8 @@ final class IntBlockHash extends BlockHash {
         return new LongArrayVector(groups, groups.length);
     }
 
-    private static final int[] EMPTY = new int[0];
-
     private LongBlock add(IntBlock block) {
-        int[] seen = EMPTY;
-        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getTotalValueCount());
-        for (int p = 0; p < block.getPositionCount(); p++) {
-            if (block.isNull(p)) {
-                builder.appendNull();
-                continue;
-            }
-            int start = block.getFirstValueIndex(p);
-            int count = block.getValueCount(p);
-            if (count == 1) {
-                builder.appendLong(hashOrdToGroup(longHash.add(block.getInt(start))));
-                continue;
-            }
-            if (seen.length < count) {
-                seen = new int[ArrayUtil.oversize(count, Integer.BYTES)];
-            }
-            builder.beginPositionEntry();
-            // TODO if we know the elements were in sorted order we wouldn't need an array at all.
-            // TODO we could also have an assertion that there aren't any duplicates on the block.
-            // Lucene has them in ascending order without duplicates
-            int end = start + count;
-            int nextSeen = 0;
-            for (int offset = start; offset < end; offset++) {
-                nextSeen = add(builder, seen, nextSeen, block.getInt(offset));
-            }
-            builder.endPositionEntry();
-        }
-        return builder.build();
-    }
-
-    private int add(LongBlock.Builder builder, int[] seen, int nextSeen, int value) {
-        /*
-         * Check if we've seen the value before. This is n^2 on the number of
-         * values, but we don't expect many of them in each entry.
-         */
-        for (int j = 0; j < nextSeen; j++) {
-            if (seen[j] == value) {
-                return nextSeen;
-            }
-        }
-        seen[nextSeen] = value;
-        builder.appendLong(hashOrdToGroup(longHash.add(value)));
-        return nextSeen + 1;
+        return new MultivalueDedupeInt(block).hash(longHash);
     }
 
     @Override

+ 2 - 40
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/LongBlockHash.java

@@ -7,7 +7,6 @@
 
 package org.elasticsearch.compute.aggregation.blockhash;
 
-import org.apache.lucene.util.ArrayUtil;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.LongHash;
 import org.elasticsearch.compute.data.IntVector;
@@ -15,6 +14,7 @@ import org.elasticsearch.compute.data.LongArrayVector;
 import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.LongVector;
 import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.MultivalueDedupeLong;
 
 final class LongBlockHash extends BlockHash {
     private final int channel;
@@ -46,45 +46,7 @@ final class LongBlockHash extends BlockHash {
     private static final long[] EMPTY = new long[0];
 
     private LongBlock add(LongBlock block) {
-        long[] seen = EMPTY;
-        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getTotalValueCount());
-        for (int p = 0; p < block.getPositionCount(); p++) {
-            if (block.isNull(p)) {
-                builder.appendNull();
-                continue;
-            }
-            int start = block.getFirstValueIndex(p);
-            int count = block.getValueCount(p);
-            if (count == 1) {
-                builder.appendLong(hashOrdToGroup(longHash.add(block.getLong(start))));
-                continue;
-            }
-            if (seen.length < count) {
-                seen = new long[ArrayUtil.oversize(count, Long.BYTES)];
-            }
-            builder.beginPositionEntry();
-            // TODO if we know the elements were in sorted order we wouldn't need an array at all.
-            // TODO we could also have an assertion that there aren't any duplicates on the block.
-            // Lucene has them in ascending order without duplicates
-            int end = start + count;
-            int nextSeen = 0;
-            for (int offset = start; offset < end; offset++) {
-                nextSeen = add(builder, seen, nextSeen, block.getLong(offset));
-            }
-            builder.endPositionEntry();
-        }
-        return builder.build();
-    }
-
-    private int add(LongBlock.Builder builder, long[] seen, int nextSeen, long value) {
-        for (int j = 0; j < nextSeen; j++) {
-            if (seen[j] == value) {
-                return nextSeen;
-            }
-        }
-        seen[nextSeen] = value;
-        builder.appendLong(hashOrdToGroup(longHash.add(value)));
-        return nextSeen + 1;
+        return new MultivalueDedupeLong(block).hash(longHash);
     }
 
     @Override

+ 2 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st

@@ -220,6 +220,8 @@ $endif$
         @Override
         Builder mvOrdering(Block.MvOrdering mvOrdering);
 
+        // TODO boolean containsMvDups();
+
         /**
          * Appends the all values of the given block into a the current position
          * in this builder.

+ 128 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MultivalueDedupe.java

@@ -0,0 +1,128 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.Page;
+
+import java.util.function.Supplier;
+
+public final class MultivalueDedupe {
+    /**
+     * Dedupe values using an adaptive algorithm based on the size of the input list.
+     */
+    public static Block dedupeToBlockAdaptive(Block block) {
+        return switch (block.elementType()) {
+            case BOOLEAN -> new MultivalueDedupeBoolean((BooleanBlock) block).dedupeToBlock();
+            case BYTES_REF -> new MultivalueDedupeBytesRef((BytesRefBlock) block).dedupeToBlockAdaptive();
+            case INT -> new MultivalueDedupeInt((IntBlock) block).dedupeToBlockAdaptive();
+            case LONG -> new MultivalueDedupeLong((LongBlock) block).dedupeToBlockAdaptive();
+            case DOUBLE -> new MultivalueDedupeDouble((DoubleBlock) block).dedupeToBlockAdaptive();
+            default -> throw new IllegalArgumentException();
+        };
+    }
+
+    /**
+     * Dedupe values using an {@code n^2} strategy with low overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public static Block dedupeToBlockUsingCopyMissing(Block block) {
+        return switch (block.elementType()) {
+            case BOOLEAN -> new MultivalueDedupeBoolean((BooleanBlock) block).dedupeToBlock();
+            case BYTES_REF -> new MultivalueDedupeBytesRef((BytesRefBlock) block).dedupeToBlockUsingCopyMissing();
+            case INT -> new MultivalueDedupeInt((IntBlock) block).dedupeToBlockUsingCopyMissing();
+            case LONG -> new MultivalueDedupeLong((LongBlock) block).dedupeToBlockUsingCopyMissing();
+            case DOUBLE -> new MultivalueDedupeDouble((DoubleBlock) block).dedupeToBlockUsingCopyMissing();
+            default -> throw new IllegalArgumentException();
+        };
+    }
+
+    /**
+     * Dedupe values using an {@code n^2} strategy with low overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public static Block dedupeToBlockUsingCopyAndSort(Block block) {
+        return switch (block.elementType()) {
+            case BOOLEAN -> new MultivalueDedupeBoolean((BooleanBlock) block).dedupeToBlock();
+            case BYTES_REF -> new MultivalueDedupeBytesRef((BytesRefBlock) block).dedupeToBlockUsingCopyAndSort();
+            case INT -> new MultivalueDedupeInt((IntBlock) block).dedupeToBlockUsingCopyAndSort();
+            case LONG -> new MultivalueDedupeLong((LongBlock) block).dedupeToBlockUsingCopyAndSort();
+            case DOUBLE -> new MultivalueDedupeDouble((DoubleBlock) block).dedupeToBlockUsingCopyAndSort();
+            default -> throw new IllegalArgumentException();
+        };
+    }
+
+    /**
+     * Build and {@link EvalOperator.ExpressionEvaluator} that deduplicates values.
+     */
+    public static Supplier<EvalOperator.ExpressionEvaluator> evaluator(
+        ElementType elementType,
+        Supplier<EvalOperator.ExpressionEvaluator> nextSupplier
+    ) {
+        return switch (elementType) {
+            case BOOLEAN -> () -> new MvDedupeEvaluator(nextSupplier.get()) {
+                @Override
+                public Block eval(Page page) {
+                    return new MultivalueDedupeBoolean((BooleanBlock) field.eval(page)).dedupeToBlock();
+                }
+            };
+            case BYTES_REF -> () -> new MvDedupeEvaluator(nextSupplier.get()) {
+                @Override
+                public Block eval(Page page) {
+                    return new MultivalueDedupeBytesRef((BytesRefBlock) field.eval(page)).dedupeToBlockAdaptive();
+                }
+            };
+            case INT -> () -> new MvDedupeEvaluator(nextSupplier.get()) {
+                @Override
+                public Block eval(Page page) {
+                    return new MultivalueDedupeInt((IntBlock) field.eval(page)).dedupeToBlockAdaptive();
+                }
+            };
+            case LONG -> () -> new MvDedupeEvaluator(nextSupplier.get()) {
+                @Override
+                public Block eval(Page page) {
+                    return new MultivalueDedupeLong((LongBlock) field.eval(page)).dedupeToBlockAdaptive();
+                }
+            };
+            case DOUBLE -> () -> new MvDedupeEvaluator(nextSupplier.get()) {
+                @Override
+                public Block eval(Page page) {
+                    return new MultivalueDedupeDouble((DoubleBlock) field.eval(page)).dedupeToBlockAdaptive();
+                }
+            };
+            case NULL -> () -> new MvDedupeEvaluator(nextSupplier.get()) {
+                @Override
+                public Block eval(Page page) {
+                    return field.eval(page); // The page is all nulls and when you dedupe that it's still all nulls
+                }
+            };
+            default -> throw new IllegalArgumentException("unsupported type [" + elementType + "]");
+        };
+    }
+
+    private abstract static class MvDedupeEvaluator implements EvalOperator.ExpressionEvaluator {
+        protected final EvalOperator.ExpressionEvaluator field;
+
+        private MvDedupeEvaluator(EvalOperator.ExpressionEvaluator field) {
+            this.field = field;
+        }
+
+        @Override
+        public String toString() {
+            return "MvDedupe[field=" + field + "]";
+        }
+    }
+
+    private MultivalueDedupe() {}
+}

+ 133 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MultivalueDedupeBoolean.java

@@ -0,0 +1,133 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.LongBlock;
+
+/**
+ * Removes duplicate values from multivalued positions.
+ */
+public class MultivalueDedupeBoolean {
+    private final BooleanBlock block;
+    private boolean seenTrue;
+    private boolean seenFalse;
+
+    public MultivalueDedupeBoolean(BooleanBlock block) {
+        this.block = block;
+    }
+
+    /**
+     * Dedupe values using an adaptive algorithm based on the size of the input list.
+     */
+    public BooleanBlock dedupeToBlock() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        BooleanBlock.Builder builder = BooleanBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendBoolean(block.getBoolean(first));
+                default -> {
+                    readValues(first, count);
+                    writeValues(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values and build a {@link LongBlock} suitable for passing
+     * as the grouping block to a {@link GroupingAggregatorFunction}.
+     * @param everSeen array tracking if the values {@code false} and {@code true} are ever seen
+     */
+    public LongBlock hash(boolean[] everSeen) {
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> builder.appendLong(hashOrd(everSeen, block.getBoolean(first)));
+                default -> {
+                    readValues(first, count);
+                    hashValues(everSeen, builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    private void readValues(int first, int count) {
+        int end = first + count;
+
+        seenFalse = false;
+        seenTrue = false;
+        for (int i = first; i < end; i++) {
+            if (block.getBoolean(i)) {
+                seenTrue = true;
+                if (seenFalse) {
+                    break;
+                }
+            } else {
+                seenFalse = true;
+                if (seenTrue) {
+                    break;
+                }
+            }
+        }
+    }
+
+    private void writeValues(BooleanBlock.Builder builder) {
+        if (seenFalse) {
+            if (seenTrue) {
+                builder.beginPositionEntry();
+                builder.appendBoolean(false);
+                builder.appendBoolean(true);
+                builder.endPositionEntry();
+            } else {
+                builder.appendBoolean(false);
+            }
+        } else if (seenTrue) {
+            builder.appendBoolean(true);
+        } else {
+            throw new IllegalStateException("didn't see true of false but counted values");
+        }
+    }
+
+    private void hashValues(boolean[] everSeen, LongBlock.Builder builder) {
+        if (seenFalse) {
+            if (seenTrue) {
+                builder.beginPositionEntry();
+                builder.appendLong(hashOrd(everSeen, false));
+                builder.appendLong(hashOrd(everSeen, true));
+                builder.endPositionEntry();
+            } else {
+                builder.appendLong(hashOrd(everSeen, false));
+            }
+        } else if (seenTrue) {
+            builder.appendLong(hashOrd(everSeen, true));
+        } else {
+            throw new IllegalStateException("didn't see true of false but counted values");
+        }
+    }
+
+    public static long hashOrd(boolean[] everSeen, boolean b) {
+        if (b) {
+            everSeen[1] = true;
+            return 1;
+        }
+        everSeen[0] = true;
+        return 0;
+    }
+}

+ 352 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/X-MultivalueDedupe.java.st

@@ -0,0 +1,352 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import org.apache.lucene.util.ArrayUtil;
+$if(BytesRef)$
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.util.BytesRefHash;
+$else$
+import org.elasticsearch.common.util.LongHash;
+$endif$
+import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
+import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
+$if(long)$
+import org.elasticsearch.compute.data.LongBlock;
+
+$else$
+import org.elasticsearch.compute.data.$Type$Block;
+import org.elasticsearch.compute.data.LongBlock;
+$endif$
+
+import java.util.Arrays;
+
+/**
+ * Removes duplicate values from multivalued positions.
+ * This class is generated. Edit {@code X-MultivalueDedupe.java.st} instead.
+ */
+public class MultivalueDedupe$Type$ {
+    /**
+     * The number of entries before we switch from and {@code n^2} strategy
+     * with low overhead to an {@code n*log(n)} strategy with higher overhead.
+     * The choice of number has been experimentally derived.
+     */
+$if(BytesRef)$
+    private static final int ALWAYS_COPY_MISSING = 20;  // TODO BytesRef should try adding to the hash *first* and then comparing.
+$elseif(double)$
+    private static final int ALWAYS_COPY_MISSING = 110;
+$elseif(int)$
+    private static final int ALWAYS_COPY_MISSING = 300;
+$elseif(long)$
+    private static final int ALWAYS_COPY_MISSING = 300;
+$endif$
+
+    private final $Type$Block block;
+    private $type$[] work = new $type$[ArrayUtil.oversize(2, $BYTES$)];
+    private int w;
+
+    public MultivalueDedupe$Type$($Type$Block block) {
+        this.block = block;
+$if(BytesRef)$
+        // TODO very large numbers might want a hash based implementation - and for BytesRef that might not be that big
+        fillWork(0, work.length);
+$endif$
+    }
+
+    /**
+     * Dedupe values using an adaptive algorithm based on the size of the input list.
+     */
+    public $Type$Block dedupeToBlockAdaptive() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        $Type$Block.Builder builder = $Type$Block.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+$if(BytesRef)$
+                case 1 -> builder.appendBytesRef(block.getBytesRef(first, work[0]));
+$else$
+                case 1 -> builder.append$Type$(block.get$Type$(first));
+$endif$
+                default -> {
+                    /*
+                     * It's better to copyMissing when there are few unique values
+                     * and better to copy and sort when there are many unique values.
+                     * The more duplicate values there are the more comparatively worse
+                     * copyAndSort is. But we don't know how many unique values there
+                     * because our job is to find them. So we use the count of values
+                     * as a proxy that is fast to test. It's not always going to be
+                     * optimal but it has the nice property of being quite quick on
+                     * short lists and not n^2 levels of terrible on long ones.
+                     *
+                     * It'd also be possible to make a truly hybrid mechanism that
+                     * switches from copyMissing to copyUnique once it collects enough
+                     * unique values. The trouble is that the switch is expensive and
+                     * makes kind of a "hole" in the performance of that mechanism where
+                     * you may as well have just gone with either of the two other
+                     * strategies. So we just don't try it for now.
+                     */
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        writeUniquedWork(builder);
+                    } else {
+                        copyAndSort(first, count);
+                        writeSortedWork(builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n*log(n)} strategy with higher overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public $Type$Block dedupeToBlockUsingCopyAndSort() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        $Type$Block.Builder builder = $Type$Block.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+$if(BytesRef)$
+                case 1 -> builder.appendBytesRef(block.getBytesRef(first, work[0]));
+$else$
+                case 1 -> builder.append$Type$(block.get$Type$(first));
+$endif$
+                default -> {
+                    copyAndSort(first, count);
+                    writeSortedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values using an {@code n^2} strategy with low overhead. Prefer {@link #dedupeToBlockAdaptive}.
+     * This is public for testing and performance testing.
+     */
+    public $Type$Block dedupeToBlockUsingCopyMissing() {
+        if (false == block.mayHaveMultivaluedFields()) {
+            return block;
+        }
+        $Type$Block.Builder builder = $Type$Block.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+$if(BytesRef)$
+                case 1 -> builder.appendBytesRef(block.getBytesRef(first, work[0]));
+$else$
+                case 1 -> builder.append$Type$(block.get$Type$(first));
+$endif$
+                default -> {
+                    copyMissing(first, count);
+                    writeUniquedWork(builder);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    /**
+     * Dedupe values and build a {@link LongBlock} suitable for passing
+     * as the grouping block to a {@link GroupingAggregatorFunction}.
+     */
+$if(BytesRef)$
+    public LongBlock hash(BytesRefHash hash) {
+$else$
+    public LongBlock hash(LongHash hash) {
+$endif$
+        LongBlock.Builder builder = LongBlock.newBlockBuilder(block.getPositionCount());
+        for (int p = 0; p < block.getPositionCount(); p++) {
+            int count = block.getValueCount(p);
+            int first = block.getFirstValueIndex(p);
+            switch (count) {
+                case 0 -> builder.appendNull();
+                case 1 -> {
+$if(BytesRef)$
+                    BytesRef v = block.getBytesRef(first, work[0]);
+$else$
+                    $type$ v = block.get$Type$(first);
+$endif$
+                    hash(builder, hash, v);
+                }
+                default -> {
+                    if (count < ALWAYS_COPY_MISSING) {
+                        copyMissing(first, count);
+                        hashUniquedWork(hash, builder);
+                    } else {
+                        copyAndSort(first, count);
+                        hashSortedWork(hash, builder);
+                    }
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    private void copyAndSort(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+        w = 0;
+        for (int i = first; i < end; i++) {
+$if(BytesRef)$
+            work[w] = block.getBytesRef(i, work[w]);
+            w++;
+$else$
+            work[w++] = block.get$Type$(i);
+$endif$
+        }
+
+        Arrays.sort(work, 0, w);
+    }
+
+    private void copyMissing(int first, int count) {
+        grow(count);
+        int end = first + count;
+
+$if(BytesRef)$
+        work[0] = block.getBytesRef(first, work[0]);
+$else$
+        work[0] = block.get$Type$(first);
+$endif$
+        w = 1;
+        i: for (int i = first + 1; i < end; i++) {
+$if(BytesRef)$
+            $type$ v = block.getBytesRef(i, work[w]);
+$else$
+            $type$ v = block.get$Type$(i);
+$endif$
+            for (int j = 0; j < w; j++) {
+$if(BytesRef)$
+                if (v.equals(work[j])) {
+$else$
+                if (v == work[j]) {
+$endif$
+                    continue i;
+                }
+            }
+            work[w++] = v;
+        }
+    }
+
+    private void writeUniquedWork($Type$Block.Builder builder) {
+        if (w == 1) {
+            builder.append$Type$(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            builder.append$Type$(work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+    private void writeSortedWork($Type$Block.Builder builder) {
+        if (w == 1) {
+            builder.append$Type$(work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        $type$ prev = work[0];
+        builder.append$Type$(prev);
+        for (int i = 1; i < w; i++) {
+$if(BytesRef)$
+            if (false == prev.equals(work[i])) {
+$else$
+            if (prev != work[i]) {
+$endif$
+                prev = work[i];
+                builder.append$Type$(prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+$if(BytesRef)$
+    private void hashUniquedWork(BytesRefHash hash, LongBlock.Builder builder) {
+$else$
+    private void hashUniquedWork(LongHash hash, LongBlock.Builder builder) {
+$endif$
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = 0; i < w; i++) {
+            hash(builder, hash, work[i]);
+        }
+        builder.endPositionEntry();
+    }
+
+$if(BytesRef)$
+    private void hashSortedWork(BytesRefHash hash, LongBlock.Builder builder) {
+$else$
+    private void hashSortedWork(LongHash hash, LongBlock.Builder builder) {
+$endif$
+        if (w == 1) {
+            hash(builder, hash, work[0]);
+            return;
+        }
+        builder.beginPositionEntry();
+        $type$ prev = work[0];
+        hash(builder, hash, prev);
+        for (int i = 1; i < w; i++) {
+$if(BytesRef)$
+            if (false == prev.equals(work[i])) {
+$else$
+            if (prev != work[i]) {
+$endif$
+                prev = work[i];
+                hash(builder, hash, prev);
+            }
+        }
+        builder.endPositionEntry();
+    }
+
+    private void grow(int size) {
+$if(BytesRef)$
+        int prev = work.length;
+        work = ArrayUtil.grow(work, size);
+        fillWork(prev, work.length);
+$else$
+        work = ArrayUtil.grow(work, size);
+$endif$
+    }
+
+$if(BytesRef)$
+    private void fillWork(int from, int to) {
+        for (int i = from; i < to; i++) {
+            work[i] = new BytesRef();
+        }
+    }
+$endif$
+
+$if(BytesRef)$
+    private void hash(LongBlock.Builder builder, BytesRefHash hash, BytesRef v) {
+$else$
+    private void hash(LongBlock.Builder builder, LongHash hash, $type$ v) {
+$endif$
+$if(double)$
+        builder.appendLong(BlockHash.hashOrdToGroup(hash.add(Double.doubleToLongBits(v))));
+$else$
+        builder.appendLong(BlockHash.hashOrdToGroup(hash.add(v)));
+$endif$
+    }
+}

+ 2 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTests.java

@@ -381,10 +381,10 @@ public class BlockHashTests extends ESTestCase {
             ordsAndKeys.ords,
             new long[] { 0 },
             new long[] { 0, 1 },
-            new long[] { 1, 0 },
+            new long[] { 0, 1 },  // Order is not preserved
             new long[] { 1 },
             null,
-            new long[] { 1, 0 }
+            new long[] { 0, 1 }
         );
         assertKeys(ordsAndKeys.keys, false, true);
         assertThat(ordsAndKeys.nonEmpty, equalTo(IntVector.range(0, 2)));

+ 11 - 5
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java

@@ -745,18 +745,21 @@ public class BasicBlockTests extends ESTestCase {
         int positionCount,
         boolean nullAllowed,
         int minValuesPerPosition,
-        int maxValuesPerPosition
+        int maxValuesPerPosition,
+        int minDupsPerPosition,
+        int maxDupsPerPosition
     ) {
         List<List<Object>> values = new ArrayList<>();
         var builder = elementType.newBlockBuilder(positionCount);
         for (int p = 0; p < positionCount; p++) {
-            if (nullAllowed && randomBoolean()) {
+            int valueCount = between(minValuesPerPosition, maxValuesPerPosition);
+            if (valueCount == 0 || nullAllowed && randomBoolean()) {
                 values.add(null);
                 builder.appendNull();
                 continue;
             }
-            int valueCount = between(minValuesPerPosition, maxValuesPerPosition);
-            if (valueCount != 1) {
+            int dupCount = between(minDupsPerPosition, maxDupsPerPosition);
+            if (valueCount != 1 || dupCount != 0) {
                 builder.beginPositionEntry();
             }
             List<Object> valuesAtPosition = new ArrayList<>();
@@ -791,7 +794,10 @@ public class BasicBlockTests extends ESTestCase {
                     default -> throw new IllegalArgumentException("unsupported element type [" + elementType + "]");
                 }
             }
-            if (valueCount != 1) {
+            for (int i = 0; i < dupCount; i++) {
+                BlockTestUtils.append(builder, randomFrom(valuesAtPosition));
+            }
+            if (valueCount != 1 || dupCount != 0) {
                 builder.endPositionEntry();
             }
         }

+ 9 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderAppendBlockTests.java

@@ -70,7 +70,15 @@ public class BlockBuilderAppendBlockTests extends ESTestCase {
 
     public void testRandom() {
         ElementType elementType = randomFrom(ElementType.INT, ElementType.BYTES_REF, ElementType.BOOLEAN);
-        Block block = BasicBlockTests.randomBlock(elementType, randomIntBetween(1, 1024), randomBoolean(), 0, between(1, 16)).block();
+        Block block = BasicBlockTests.randomBlock(
+            elementType,
+            randomIntBetween(1, 1024),
+            randomBoolean(),
+            0,
+            between(1, 16),
+            0,
+            between(0, 16)
+        ).block();
         randomlyDivideAndMerge(block);
     }
 

+ 2 - 1
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockBuilderCopyFromTests.java

@@ -95,7 +95,8 @@ public class BlockBuilderCopyFromTests extends ESTestCase {
 
     private Block randomBlock() {
         int positionCount = randomIntBetween(1, 16 * 1024);
-        return BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, minValuesPerPosition, maxValuesPerPosition).block();
+        return BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, minValuesPerPosition, maxValuesPerPosition, 0, 0)
+            .block();
     }
 
     private Block randomFilteredBlock() {

+ 6 - 12
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java

@@ -45,18 +45,12 @@ public class BlockMultiValuedTests extends ESTestCase {
 
     public void testMultiValued() {
         int positionCount = randomIntBetween(1, 16 * 1024);
-        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10);
+        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0);
 
         assertThat(b.block().getPositionCount(), equalTo(positionCount));
         assertThat(b.block().getTotalValueCount(), equalTo(b.valueCount()));
-        for (int r = 0; r < positionCount; r++) {
-            if (b.values().get(r) == null) {
-                assertThat(b.block().getValueCount(r), equalTo(0));
-                assertThat(b.block().isNull(r), equalTo(true));
-            } else {
-                assertThat(b.block().getValueCount(r), equalTo(b.values().get(r).size()));
-                assertThat(BasicBlockTests.valuesAtPositions(b.block(), r, r + 1).get(0), equalTo(b.values().get(r)));
-            }
+        for (int p = 0; p < positionCount; p++) {
+            BlockTestUtils.assertPositionValues(b.block(), p, equalTo(b.values().get(p)));
         }
 
         assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1)));
@@ -64,7 +58,7 @@ public class BlockMultiValuedTests extends ESTestCase {
 
     public void testExpand() {
         int positionCount = randomIntBetween(1, 16 * 1024);
-        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10);
+        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 100, 0, 0);
         assertExpanded(b.block());
     }
 
@@ -102,7 +96,7 @@ public class BlockMultiValuedTests extends ESTestCase {
 
     private void assertFiltered(boolean all, boolean shuffled) {
         int positionCount = randomIntBetween(1, 16 * 1024);
-        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10);
+        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0);
         int[] positions = randomFilterPositions(b.block(), all, shuffled);
         Block filtered = b.block().filter(positions);
 
@@ -163,7 +157,7 @@ public class BlockMultiValuedTests extends ESTestCase {
 
     private void assertFilteredThenExpanded(boolean all, boolean shuffled) {
         int positionCount = randomIntBetween(1, 16 * 1024);
-        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10);
+        var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0);
         int[] positions = randomFilterPositions(b.block(), all, shuffled);
         assertExpanded(b.block().filter(positions));
     }

+ 16 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockTestUtils.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.compute.data;
 
 import org.apache.lucene.util.BytesRef;
+import org.hamcrest.Matcher;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -19,6 +20,8 @@ import static org.elasticsearch.test.ESTestCase.randomBoolean;
 import static org.elasticsearch.test.ESTestCase.randomDouble;
 import static org.elasticsearch.test.ESTestCase.randomInt;
 import static org.elasticsearch.test.ESTestCase.randomLong;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
 
 public class BlockTestUtils {
     /**
@@ -81,4 +84,17 @@ public class BlockTestUtils {
             values.add(toJavaObject(block, p));
         }
     }
+
+    /**
+     * Assert that the values at a particular position match the provided {@link Matcher}.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> void assertPositionValues(Block b, int p, Matcher<T> valuesMatcher) {
+        List<Object> value = BasicBlockTests.valuesAtPositions(b, p, p + 1).get(0);
+        assertThat((T) value, valuesMatcher);
+        if (value == null) {
+            assertThat(b.getValueCount(p), equalTo(0));
+            assertThat(b.isNull(p), equalTo(true));
+        }
+    }
 }

+ 262 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MultivalueDedupeTests.java

@@ -0,0 +1,262 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.compute.operator;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.BytesRefHash;
+import org.elasticsearch.common.util.LongHash;
+import org.elasticsearch.compute.data.BasicBlockTests;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BlockTestUtils;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.ElementType;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.test.ESTestCase;
+import org.hamcrest.Matcher;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.LongFunction;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+public class MultivalueDedupeTests extends ESTestCase {
+    @ParametersFactory
+    public static List<Object[]> params() {
+        List<Object[]> params = new ArrayList<>();
+        for (ElementType elementType : ElementType.values()) {
+            if (elementType == ElementType.UNKNOWN || elementType == ElementType.NULL || elementType == ElementType.DOC) {
+                continue;
+            }
+            for (boolean nullAllowed : new boolean[] { false, true }) {
+                for (int max : new int[] { 10, 100, 1000 }) {
+                    params.add(new Object[] { elementType, 1000, nullAllowed, 1, max, 0, 0 });
+                    params.add(new Object[] { elementType, 1000, nullAllowed, 1, max, 0, 100 });
+                }
+            }
+        }
+        return params;
+    }
+
+    private final ElementType elementType;
+    private final int positionCount;
+    private final boolean nullAllowed;
+    private final int minValuesPerPosition;
+    private final int maxValuesPerPosition;
+    private final int minDupsPerPosition;
+    private final int maxDupsPerPosition;
+
+    public MultivalueDedupeTests(
+        ElementType elementType,
+        int positionCount,
+        boolean nullAllowed,
+        int minValuesPerPosition,
+        int maxValuesPerPosition,
+        int minDupsPerPosition,
+        int maxDupsPerPosition
+    ) {
+        this.elementType = elementType;
+        this.positionCount = positionCount;
+        this.nullAllowed = nullAllowed;
+        this.minValuesPerPosition = minValuesPerPosition;
+        this.maxValuesPerPosition = maxValuesPerPosition;
+        this.minDupsPerPosition = minDupsPerPosition;
+        this.maxDupsPerPosition = maxDupsPerPosition;
+    }
+
+    public void testDedupeAdaptive() {
+        BasicBlockTests.RandomBlock b = randomBlock();
+        assertDeduped(b, MultivalueDedupe.dedupeToBlockAdaptive(b.block()));
+    }
+
+    public void testDedupeViaCopyAndSort() {
+        BasicBlockTests.RandomBlock b = randomBlock();
+        assertDeduped(b, MultivalueDedupe.dedupeToBlockUsingCopyAndSort(b.block()));
+    }
+
+    public void testDedupeViaCopyMissing() {
+        BasicBlockTests.RandomBlock b = randomBlock();
+        assertDeduped(b, MultivalueDedupe.dedupeToBlockUsingCopyMissing(b.block()));
+    }
+
+    private BasicBlockTests.RandomBlock randomBlock() {
+        return BasicBlockTests.randomBlock(
+            elementType,
+            positionCount,
+            nullAllowed,
+            minValuesPerPosition,
+            maxValuesPerPosition,
+            minDupsPerPosition,
+            maxDupsPerPosition
+        );
+    }
+
+    private void assertDeduped(BasicBlockTests.RandomBlock b, Block deduped) {
+        for (int p = 0; p < b.block().getPositionCount(); p++) {
+            List<Object> v = b.values().get(p);
+            Matcher<? extends Object> matcher = v == null
+                ? nullValue()
+                : containsInAnyOrder(v.stream().collect(Collectors.toSet()).stream().sorted().toArray());
+            BlockTestUtils.assertPositionValues(deduped, p, matcher);
+        }
+    }
+
+    public void testHash() {
+        BasicBlockTests.RandomBlock b = randomBlock();
+
+        switch (b.block().elementType()) {
+            case BOOLEAN -> assertBooleanHash(Set.of(), b);
+            case BYTES_REF -> assertBytesRefHash(Set.of(), b);
+            case INT -> assertIntHash(Set.of(), b);
+            case LONG -> assertLongHash(Set.of(), b);
+            case DOUBLE -> assertDoubleHash(Set.of(), b);
+            default -> throw new IllegalArgumentException();
+        }
+    }
+
+    public void testHashWithPreviousValues() {
+        BasicBlockTests.RandomBlock b = randomBlock();
+
+        switch (b.block().elementType()) {
+            case BOOLEAN -> {
+                Set<Boolean> previousValues = switch (between(0, 2)) {
+                    case 0 -> Set.of(false);
+                    case 1 -> Set.of(true);
+                    case 2 -> Set.of(false, true);
+                    default -> throw new IllegalArgumentException();
+                };
+                assertBooleanHash(previousValues, b);
+            }
+            case BYTES_REF -> {
+                int prevSize = between(1, 10000);
+                Set<BytesRef> previousValues = new HashSet<>(prevSize);
+                while (previousValues.size() < prevSize) {
+                    previousValues.add(new BytesRef(randomAlphaOfLengthBetween(1, 20)));
+                }
+                assertBytesRefHash(previousValues, b);
+            }
+            case INT -> {
+                int prevSize = between(1, 10000);
+                Set<Integer> previousValues = new HashSet<>(prevSize);
+                while (previousValues.size() < prevSize) {
+                    previousValues.add(randomInt());
+                }
+                assertIntHash(previousValues, b);
+            }
+            case LONG -> {
+                int prevSize = between(1, 10000);
+                Set<Long> previousValues = new HashSet<>(prevSize);
+                while (previousValues.size() < prevSize) {
+                    previousValues.add(randomLong());
+                }
+                assertLongHash(previousValues, b);
+            }
+            case DOUBLE -> {
+                int prevSize = between(1, 10000);
+                Set<Double> previousValues = new HashSet<>(prevSize);
+                while (previousValues.size() < prevSize) {
+                    previousValues.add(randomDouble());
+                }
+                assertDoubleHash(previousValues, b);
+            }
+            default -> throw new IllegalArgumentException();
+        }
+    }
+
+    private void assertBooleanHash(Set<Boolean> previousValues, BasicBlockTests.RandomBlock b) {
+        boolean[] everSeen = new boolean[2];
+        if (previousValues.contains(false)) {
+            everSeen[0] = true;
+        }
+        if (previousValues.contains(true)) {
+            everSeen[1] = true;
+        }
+        LongBlock hashes = new MultivalueDedupeBoolean((BooleanBlock) b.block()).hash(everSeen);
+        List<Boolean> hashedValues = new ArrayList<>();
+        if (everSeen[0]) {
+            hashedValues.add(false);
+        }
+        if (everSeen[0]) {
+            hashedValues.add(true);
+        }
+        assertHash(b, hashes, hashedValues.size(), previousValues, i -> hashedValues.get((int) i));
+    }
+
+    private void assertBytesRefHash(Set<BytesRef> previousValues, BasicBlockTests.RandomBlock b) {
+        BytesRefHash hash = new BytesRefHash(1, BigArrays.NON_RECYCLING_INSTANCE);
+        previousValues.stream().forEach(hash::add);
+        LongBlock hashes = new MultivalueDedupeBytesRef((BytesRefBlock) b.block()).hash(hash);
+        assertHash(b, hashes, hash.size(), previousValues, i -> hash.get(i, new BytesRef()));
+    }
+
+    private void assertIntHash(Set<Integer> previousValues, BasicBlockTests.RandomBlock b) {
+        LongHash hash = new LongHash(1, BigArrays.NON_RECYCLING_INSTANCE);
+        previousValues.stream().forEach(hash::add);
+        LongBlock hashes = new MultivalueDedupeInt((IntBlock) b.block()).hash(hash);
+        assertHash(b, hashes, hash.size(), previousValues, i -> (int) hash.get(i));
+    }
+
+    private void assertLongHash(Set<Long> previousValues, BasicBlockTests.RandomBlock b) {
+        LongHash hash = new LongHash(1, BigArrays.NON_RECYCLING_INSTANCE);
+        previousValues.stream().forEach(hash::add);
+        LongBlock hashes = new MultivalueDedupeLong((LongBlock) b.block()).hash(hash);
+        assertHash(b, hashes, hash.size(), previousValues, i -> hash.get(i));
+    }
+
+    private void assertDoubleHash(Set<Double> previousValues, BasicBlockTests.RandomBlock b) {
+        LongHash hash = new LongHash(1, BigArrays.NON_RECYCLING_INSTANCE);
+        previousValues.stream().forEach(d -> hash.add(Double.doubleToLongBits(d)));
+        LongBlock hashes = new MultivalueDedupeDouble((DoubleBlock) b.block()).hash(hash);
+        assertHash(b, hashes, hash.size(), previousValues, i -> Double.longBitsToDouble(hash.get(i)));
+    }
+
+    private void assertHash(
+        BasicBlockTests.RandomBlock b,
+        LongBlock hashes,
+        long hashSize,
+        Set<? extends Object> previousValues,
+        LongFunction<Object> lookup
+    ) {
+        Set<Object> allValues = new HashSet<>();
+        allValues.addAll(previousValues);
+        for (int p = 0; p < b.block().getPositionCount(); p++) {
+            int count = hashes.getValueCount(p);
+            List<Object> v = b.values().get(p);
+            if (v == null) {
+                assertThat(hashes.isNull(p), equalTo(true));
+                assertThat(count, equalTo(0));
+                return;
+            }
+            List<Object> actualValues = new ArrayList<>(count);
+            int start = hashes.getFirstValueIndex(p);
+            int end = start + count;
+            for (int i = start; i < end; i++) {
+                actualValues.add(lookup.apply(hashes.getLong(i)));
+            }
+            assertThat(actualValues, containsInAnyOrder(v.stream().collect(Collectors.toSet()).stream().sorted().toArray()));
+            allValues.addAll(v);
+        }
+
+        Set<Object> hashedValues = new HashSet<>((int) hashSize);
+        for (long i = 0; i < hashSize; i++) {
+            hashedValues.add(lookup.apply(i));
+        }
+        assertThat(hashedValues, equalTo(allValues));
+    }
+}

+ 2 - 2
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java

@@ -37,8 +37,8 @@ public class MvExpandOperatorTests extends OperatorTestCase {
             protected Page createPage(int positionOffset, int length) {
                 idx += length;
                 return new Page(
-                    randomBlock(ElementType.INT, length, true, 1, 10).block(),
-                    randomBlock(ElementType.INT, length, false, 1, 10).block()
+                    randomBlock(ElementType.INT, length, true, 1, 10, 0, 0).block(),
+                    randomBlock(ElementType.INT, length, false, 1, 10, 0, 0).block()
                 );
             }
         };

+ 7 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec

@@ -193,3 +193,10 @@ emp_no:integer |salary_change:double
 10030          | -0.4
 10030          | -0.4
 ;
+
+mvDedupe
+row a = [1.1, 2.1, 2.1] | eval da = mv_dedupe(a);
+
+       a:double | da:double
+[1.1, 2.1, 2.1] | [1.1, 2.1]
+;

+ 7 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec

@@ -264,3 +264,10 @@ emp_no:integer |salary_change.int:integer
 10030          | -0
 10030          | -0
 ;
+
+mvDedupe
+row a = [1, 2, 2, 3] | eval da = mv_dedupe(a);
+
+   a:integer | da:integer
+[1, 2, 2, 3] | [1, 2, 3]
+;

+ 1 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec

@@ -33,6 +33,7 @@ min                      |min(arg1)
 mv_avg                   |mv_avg(arg1)
 mv_concat                |mv_concat(arg1, arg2)
 mv_count                 |mv_count(arg1)
+mv_dedupe                |mv_dedupe(arg1)
 mv_max                   |mv_max(arg1)
 mv_median                |mv_median(arg1)
 mv_min                   |mv_min(arg1)

+ 13 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec

@@ -232,6 +232,19 @@ ROW a=["foo", "zoo", "bar"]
 // end::mv_count-result[]
 ;
 
+mvDedupe
+// tag::mv_dedupe[]
+ROW a=["foo", "foo", "bar", "foo"]
+| EVAL dedupe_a = MV_DEDUPE(a)
+// end::mv_dedupe[]
+;
+
+// tag::mv_dedupe-result[]
+                   a:keyword | dedupe_a:keyword
+["foo", "foo", "bar", "foo"] | ["foo", "bar"]
+// end::mv_dedupe-result[]
+;
+
 mvJoin
 // tag::mv_concat[]
 ROW a=["foo", "zoo", "bar"]

+ 0 - 80
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToBooleanFromKeywordEvaluator.java

@@ -1,80 +0,0 @@
-// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
-// or more contributor license agreements. Licensed under the Elastic License
-// 2.0; you may not use this file except in compliance with the Elastic License
-// 2.0.
-package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
-
-import java.lang.Override;
-import java.lang.String;
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.compute.data.Block;
-import org.elasticsearch.compute.data.BooleanArrayVector;
-import org.elasticsearch.compute.data.BooleanBlock;
-import org.elasticsearch.compute.data.BytesRefBlock;
-import org.elasticsearch.compute.data.BytesRefVector;
-import org.elasticsearch.compute.data.ConstantBooleanVector;
-import org.elasticsearch.compute.data.Vector;
-import org.elasticsearch.compute.operator.EvalOperator;
-
-/**
- * {@link EvalOperator.ExpressionEvaluator} implementation for {@link ToBoolean}.
- * This class is generated. Do not edit it.
- */
-public final class ToBooleanFromKeywordEvaluator extends AbstractConvertFunction.AbstractEvaluator {
-  public ToBooleanFromKeywordEvaluator(EvalOperator.ExpressionEvaluator field) {
-    super(field);
-  }
-
-  @Override
-  public String name() {
-    return "ToBoolean";
-  }
-
-  @Override
-  public Vector evalVector(Vector v) {
-    BytesRefVector vector = (BytesRefVector) v;
-    int positionCount = v.getPositionCount();
-    BytesRef scratchPad = new BytesRef();
-    if (vector.isConstant()) {
-      return new ConstantBooleanVector(evalValue(vector, 0, scratchPad), positionCount);
-    }
-    boolean[] values = new boolean[positionCount];
-    for (int p = 0; p < positionCount; p++) {
-      values[p] = evalValue(vector, p, scratchPad);
-    }
-    return new BooleanArrayVector(values, positionCount);
-  }
-
-  private static boolean evalValue(BytesRefVector container, int index, BytesRef scratchPad) {
-    BytesRef value = container.getBytesRef(index, scratchPad);
-    return ToBoolean.fromKeyword(value);
-  }
-
-  @Override
-  public Block evalBlock(Block b) {
-    BytesRefBlock block = (BytesRefBlock) b;
-    int positionCount = block.getPositionCount();
-    BooleanBlock.Builder builder = BooleanBlock.newBlockBuilder(positionCount);
-    BytesRef scratchPad = new BytesRef();
-    for (int p = 0; p < positionCount; p++) {
-      int valueCount = block.getValueCount(p);
-      if (valueCount == 0) {
-        builder.appendNull();
-        continue;
-      }
-      int start = block.getFirstValueIndex(p);
-      int end = start + valueCount;
-      builder.beginPositionEntry();
-      for (int i = start; i < end; i++) {
-        builder.appendBoolean(evalValue(block, i, scratchPad));
-      }
-      builder.endPositionEntry();
-    }
-    return builder.build();
-  }
-
-  private static boolean evalValue(BytesRefBlock container, int index, BytesRef scratchPad) {
-    BytesRef value = container.getBytesRef(index, scratchPad);
-    return ToBoolean.fromKeyword(value);
-  }
-}

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

@@ -42,6 +42,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.math.Round;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvAvg;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvConcat;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvCount;
+import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvDedupe;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMax;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMedian;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
@@ -121,6 +122,7 @@ public class EsqlFunctionRegistry extends FunctionRegistry {
                 def(MvAvg.class, MvAvg::new, "mv_avg"),
                 def(MvConcat.class, MvConcat::new, "mv_concat"),
                 def(MvCount.class, MvCount::new, "mv_count"),
+                def(MvDedupe.class, MvDedupe::new, "mv_dedupe"),
                 def(MvMax.class, MvMax::new, "mv_max"),
                 def(MvMedian.class, MvMedian::new, "mv_median"),
                 def(MvMin.class, MvMin::new, "mv_min"),

+ 50 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvDedupe.java

@@ -0,0 +1,50 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.compute.operator.MultivalueDedupe;
+import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
+import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.tree.NodeInfo;
+import org.elasticsearch.xpack.ql.tree.Source;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isType;
+
+/**
+ * Removes duplicate values from a multivalued field.
+ */
+public class MvDedupe extends AbstractMultivalueFunction {
+    public MvDedupe(Source source, Expression field) {
+        super(source, field);
+    }
+
+    @Override
+    protected TypeResolution resolveFieldType() {
+        return isType(field(), EsqlDataTypes::isRepresentable, sourceText(), null, "representable");
+    }
+
+    @Override
+    protected Supplier<EvalOperator.ExpressionEvaluator> evaluator(Supplier<EvalOperator.ExpressionEvaluator> fieldEval) {
+        return MultivalueDedupe.evaluator(LocalExecutionPlanner.toElementType(dataType()), fieldEval);
+    }
+
+    @Override
+    public Expression replaceChildren(List<Expression> newChildren) {
+        return new MvDedupe(source(), newChildren.get(0));
+    }
+
+    @Override
+    protected NodeInfo<? extends Expression> info() {
+        return NodeInfo.create(this, MvDedupe::new, field());
+    }
+}

+ 6 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -53,6 +53,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.Abstra
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvAvg;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvConcat;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvCount;
+import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvDedupe;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMax;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMedian;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
@@ -305,7 +306,8 @@ public final class PlanNamedTypes {
             // Multivalue functions
             of(ScalarFunction.class, MvAvg.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
             of(ScalarFunction.class, MvCount.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
-            of(ScalarFunction.class, MvConcat.class, PlanNamedTypes::writeMvJoin, PlanNamedTypes::readMvJoin),
+            of(ScalarFunction.class, MvConcat.class, PlanNamedTypes::writeMvConcat, PlanNamedTypes::readMvConcat),
+            of(ScalarFunction.class, MvDedupe.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
             of(ScalarFunction.class, MvMax.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
             of(ScalarFunction.class, MvMedian.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
             of(ScalarFunction.class, MvMin.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
@@ -1166,6 +1168,7 @@ public final class PlanNamedTypes {
     static final Map<String, BiFunction<Source, Expression, AbstractMultivalueFunction>> MV_CTRS = Map.ofEntries(
         entry(name(MvAvg.class), MvAvg::new),
         entry(name(MvCount.class), MvCount::new),
+        entry(name(MvDedupe.class), MvDedupe::new),
         entry(name(MvMax.class), MvMax::new),
         entry(name(MvMedian.class), MvMedian::new),
         entry(name(MvMin.class), MvMin::new),
@@ -1180,11 +1183,11 @@ public final class PlanNamedTypes {
         out.writeExpression(fn.field());
     }
 
-    static MvConcat readMvJoin(PlanStreamInput in) throws IOException {
+    static MvConcat readMvConcat(PlanStreamInput in) throws IOException {
         return new MvConcat(Source.EMPTY, in.readExpression(), in.readExpression());
     }
 
-    static void writeMvJoin(PlanStreamOutput out, MvConcat fn) throws IOException {
+    static void writeMvConcat(PlanStreamOutput out, MvConcat fn) throws IOException {
         out.writeExpression(fn.left());
         out.writeExpression(fn.right());
     }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlStatsAction.java

@@ -50,7 +50,7 @@ public class TransportEsqlStatsAction extends TransportNodesAction<
             EsqlStatsRequest::new,
             EsqlStatsRequest.NodeStatsRequest::new,
             ThreadPool.Names.MANAGEMENT
-            );
+        );
         this.planExecutor = planExecutor;
     }
 

+ 53 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvDedupeTests.java

@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.tree.Source;
+import org.elasticsearch.xpack.ql.type.DataType;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+public class MvDedupeTests extends AbstractMultivalueFunctionTestCase {
+    @Override
+    protected Expression build(Source source, Expression field) {
+        return new MvDedupe(source, field);
+    }
+
+    @Override
+    protected DataType[] supportedTypes() {
+        return representable();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Matcher<Object> resultMatcherForInput(List<?> input) {
+        if (input == null) {
+            return nullValue();
+        }
+        Set<Object> values = input.stream().collect(Collectors.toSet());
+        return switch (values.size()) {
+            case 0 -> nullValue();
+            case 1 -> equalTo(values.iterator().next());
+            default -> (Matcher<Object>) (Matcher<?>) containsInAnyOrder(values.stream().map(Matchers::equalTo).toArray(Matcher[]::new));
+        };
+    }
+
+    @Override
+    protected String expectedEvaluatorSimpleToString() {
+        return "MvDedupe[field=Attribute[channel=0]]";
+    }
+}