소스 검색

add mv_slice and mv_zip (#106147)

Fang Xing 1 년 전
부모
커밋
8d839e3b52
27개의 변경된 파일2447개의 추가작업 그리고 17개의 파일을 삭제
  1. 4 0
      docs/reference/esql/functions/mv-functions.asciidoc
  2. 47 0
      docs/reference/esql/functions/mv_slice.asciidoc
  3. 38 0
      docs/reference/esql/functions/mv_zip.asciidoc
  4. 1 0
      docs/reference/esql/functions/signature/mv_slice.svg
  5. 1 0
      docs/reference/esql/functions/signature/mv_zip.svg
  6. 17 0
      docs/reference/esql/functions/types/mv_slice.asciidoc
  7. 6 0
      docs/reference/esql/functions/types/mv_zip.asciidoc
  8. 175 15
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/EvaluatorImplementer.java
  9. 23 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/boolean.csv-spec
  10. 15 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec
  11. 145 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec
  12. 44 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec
  13. 5 1
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec
  14. 44 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec
  15. 140 0
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceBooleanEvaluator.java
  16. 140 0
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceBytesRefEvaluator.java
  17. 140 0
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceDoubleEvaluator.java
  18. 139 0
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceIntEvaluator.java
  19. 140 0
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceLongEvaluator.java
  20. 127 0
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvZipEvaluator.java
  21. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
  22. 344 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSlice.java
  23. 211 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvZip.java
  24. 30 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  25. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java
  26. 346 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceTests.java
  27. 120 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvZipTests.java

+ 4 - 0
docs/reference/esql/functions/mv-functions.asciidoc

@@ -17,7 +17,9 @@
 * <<esql-mv_max>>
 * <<esql-mv_median>>
 * <<esql-mv_min>>
+* <<esql-mv_slice>>
 * <<esql-mv_sum>>
+* <<esql-mv_zip>>
 // end::mv_list[]
 
 include::mv_avg.asciidoc[]
@@ -29,4 +31,6 @@ include::mv_last.asciidoc[]
 include::mv_max.asciidoc[]
 include::mv_median.asciidoc[]
 include::mv_min.asciidoc[]
+include::mv_slice.asciidoc[]
 include::mv_sum.asciidoc[]
+include::mv_zip.asciidoc[]

+ 47 - 0
docs/reference/esql/functions/mv_slice.asciidoc

@@ -0,0 +1,47 @@
+[discrete]
+[[esql-mv_slice]]
+=== `MV_SLICE`
+
+*Syntax*
+
+[.text-center]
+image::esql/functions/signature/mv_slice.svg[Embedded,opts=inline]
+
+*Parameters*
+
+`field`::
+Multivalue expression. If `null`, the function returns `null`.
+
+`start`::
+Start position. If `null`, the function returns `null`. The start argument can be negative. An index of -1 is used to specify the last value in the list.
+
+`end`::
+End position. Optional; if omitted, the position at `start` is returned. The end argument can be negative. An index of -1 is used to specify the last value in the list.
+
+*Description*
+
+Returns a subset of the multivalued field using the start and end index values.
+
+*Supported types*
+
+include::types/mv_slice.asciidoc[]
+
+*Example*
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/ints.csv-spec[tag=mv_slice_positive]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/ints.csv-spec[tag=mv_slice_positive-result]
+|===
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/ints.csv-spec[tag=mv_slice_negative]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/ints.csv-spec[tag=mv_slice_negative-result]
+|===

+ 38 - 0
docs/reference/esql/functions/mv_zip.asciidoc

@@ -0,0 +1,38 @@
+[discrete]
+[[esql-mv_zip]]
+=== `MV_ZIP`
+
+*Syntax*
+
+[.text-center]
+image::esql/functions/signature/mv_zip.svg[Embedded,opts=inline]
+
+*Parameters*
+
+`mvLeft`::
+Multivalue expression.
+
+`mvRight`::
+Multivalue expression.
+
+`delim`::
+Delimiter. Optional; if omitted, `,` is used as a default delimiter.
+
+*Description*
+
+Combines the values from two multivalued fields with a delimiter that joins them together.
+
+*Supported types*
+
+include::types/mv_zip.asciidoc[]
+
+*Example*
+
+[source.merge.styled,esql]
+----
+include::{esql-specs}/string.csv-spec[tag=mv_zip]
+----
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+include::{esql-specs}/string.csv-spec[tag=mv_zip-result]
+|===

+ 1 - 0
docs/reference/esql/functions/signature/mv_slice.svg

@@ -0,0 +1 @@
+<svg version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="492" height="46" viewbox="0 0 492 46"><defs><style type="text/css">#guide .c{fill:none;stroke:#222222;}#guide .k{fill:#000000;font-family:Roboto Mono,Sans-serif;font-size:20px;}#guide .s{fill:#e4f4ff;stroke:#222222;}#guide .syn{fill:#8D8D8D;font-family:Roboto Mono,Sans-serif;font-size:20px;}</style></defs><path class="c" d="M0 31h5m116 0h10m32 0h10m32 0h10m32 0h10m80 0h10m32 0h10m56 0h10m32 0h5"/><rect class="s" x="5" y="5" width="116" height="36"/><text class="k" x="15" y="31">MV_SLICE</text><rect class="s" x="131" y="5" width="32" height="36" rx="7"/><text class="syn" x="141" y="31">(</text><rect class="s" x="173" y="5" width="32" height="36" rx="7"/><text class="k" x="183" y="31">v</text><rect class="s" x="215" y="5" width="32" height="36" rx="7"/><text class="syn" x="225" y="31">,</text><rect class="s" x="257" y="5" width="80" height="36" rx="7"/><text class="k" x="267" y="31">start</text><rect class="s" x="347" y="5" width="32" height="36" rx="7"/><text class="syn" x="357" y="31">,</text><rect class="s" x="389" y="5" width="56" height="36" rx="7"/><text class="k" x="399" y="31">end</text><rect class="s" x="455" y="5" width="32" height="36" rx="7"/><text class="syn" x="465" y="31">)</text></svg>

+ 1 - 0
docs/reference/esql/functions/signature/mv_zip.svg

@@ -0,0 +1 @@
+<svg version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="576" height="46" viewbox="0 0 576 46"><defs><style type="text/css">#guide .c{fill:none;stroke:#222222;}#guide .k{fill:#000000;font-family:Roboto Mono,Sans-serif;font-size:20px;}#guide .s{fill:#e4f4ff;stroke:#222222;}#guide .syn{fill:#8D8D8D;font-family:Roboto Mono,Sans-serif;font-size:20px;}</style></defs><path class="c" d="M0 31h5m92 0h10m32 0h10m92 0h10m32 0h10m104 0h10m32 0h10m80 0h10m32 0h5"/><rect class="s" x="5" y="5" width="92" height="36"/><text class="k" x="15" y="31">MV_ZIP</text><rect class="s" x="107" y="5" width="32" height="36" rx="7"/><text class="syn" x="117" y="31">(</text><rect class="s" x="149" y="5" width="92" height="36" rx="7"/><text class="k" x="159" y="31">mvLeft</text><rect class="s" x="251" y="5" width="32" height="36" rx="7"/><text class="syn" x="261" y="31">,</text><rect class="s" x="293" y="5" width="104" height="36" rx="7"/><text class="k" x="303" y="31">mvRight</text><rect class="s" x="407" y="5" width="32" height="36" rx="7"/><text class="syn" x="417" y="31">,</text><rect class="s" x="449" y="5" width="80" height="36" rx="7"/><text class="k" x="459" y="31">delim</text><rect class="s" x="539" y="5" width="32" height="36" rx="7"/><text class="syn" x="549" y="31">)</text></svg>

+ 17 - 0
docs/reference/esql/functions/types/mv_slice.asciidoc

@@ -0,0 +1,17 @@
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+v | start | end | result
+boolean | integer | integer | boolean
+cartesian_point | integer | integer | cartesian_point
+cartesian_shape | integer | integer | cartesian_shape
+datetime | integer | integer | datetime
+double | integer | integer | double
+geo_point | integer | integer | geo_point
+geo_shape | integer | integer | geo_shape
+integer | integer | integer | integer
+ip | integer | integer | ip
+keyword | integer | integer | keyword
+long | integer | integer | long
+text | integer | integer | text
+version | integer | integer | version
+|===

+ 6 - 0
docs/reference/esql/functions/types/mv_zip.asciidoc

@@ -0,0 +1,6 @@
+[%header.monospaced.styled,format=dsv,separator=|]
+|===
+mvLeft | mvRight | delim | result
+keyword | keyword | keyword | keyword
+text | text | text | keyword
+|===

+ 175 - 15
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/EvaluatorImplementer.java

@@ -36,10 +36,15 @@ import static org.elasticsearch.compute.gen.Methods.appendMethod;
 import static org.elasticsearch.compute.gen.Methods.buildFromFactory;
 import static org.elasticsearch.compute.gen.Methods.getMethod;
 import static org.elasticsearch.compute.gen.Types.BLOCK;
+import static org.elasticsearch.compute.gen.Types.BOOLEAN_BLOCK;
 import static org.elasticsearch.compute.gen.Types.BYTES_REF;
+import static org.elasticsearch.compute.gen.Types.BYTES_REF_BLOCK;
+import static org.elasticsearch.compute.gen.Types.DOUBLE_BLOCK;
 import static org.elasticsearch.compute.gen.Types.DRIVER_CONTEXT;
 import static org.elasticsearch.compute.gen.Types.EXPRESSION_EVALUATOR;
 import static org.elasticsearch.compute.gen.Types.EXPRESSION_EVALUATOR_FACTORY;
+import static org.elasticsearch.compute.gen.Types.INT_BLOCK;
+import static org.elasticsearch.compute.gen.Types.LONG_BLOCK;
 import static org.elasticsearch.compute.gen.Types.PAGE;
 import static org.elasticsearch.compute.gen.Types.RELEASABLE;
 import static org.elasticsearch.compute.gen.Types.RELEASABLES;
@@ -53,6 +58,7 @@ public class EvaluatorImplementer {
     private final TypeElement declarationType;
     private final ProcessFunction processFunction;
     private final ClassName implementation;
+    private final boolean processOutputsMultivalued;
 
     public EvaluatorImplementer(
         Elements elements,
@@ -68,6 +74,7 @@ public class EvaluatorImplementer {
             elements.getPackageOf(declarationType).toString(),
             declarationType.getSimpleName() + extraName + "Evaluator"
         );
+        this.processOutputsMultivalued = this.processFunction.hasBlockType && (this.processFunction.builderArg != null);
     }
 
     public JavaFile sourceFile() {
@@ -94,10 +101,17 @@ public class EvaluatorImplementer {
 
         builder.addMethod(ctor());
         builder.addMethod(eval());
-        if (processFunction.args.stream().anyMatch(x -> x instanceof FixedProcessFunctionArg == false)) {
-            builder.addMethod(realEval(true));
+
+        if (processOutputsMultivalued) {
+            if (processFunction.args.stream().anyMatch(x -> x instanceof FixedProcessFunctionArg == false)) {
+                builder.addMethod(realEval(true));
+            }
+        } else {
+            if (processFunction.args.stream().anyMatch(x -> x instanceof FixedProcessFunctionArg == false)) {
+                builder.addMethod(realEval(true));
+            }
+            builder.addMethod(realEval(false));
         }
-        builder.addMethod(realEval(false));
         builder.addMethod(toStringMethod());
         builder.addMethod(close());
         return builder.build();
@@ -117,17 +131,21 @@ public class EvaluatorImplementer {
     private MethodSpec eval() {
         MethodSpec.Builder builder = MethodSpec.methodBuilder("eval").addAnnotation(Override.class);
         builder.addModifiers(Modifier.PUBLIC).returns(BLOCK).addParameter(PAGE, "page");
-
         processFunction.args.stream().forEach(a -> a.evalToBlock(builder));
         String invokeBlockEval = invokeRealEval(true);
-        processFunction.args.stream().forEach(a -> a.resolveVectors(builder, invokeBlockEval));
-        builder.addStatement(invokeRealEval(false));
+        if (processOutputsMultivalued) {
+            builder.addStatement(invokeBlockEval);
+        } else {
+            processFunction.args.stream().forEach(a -> a.resolveVectors(builder, invokeBlockEval));
+            builder.addStatement(invokeRealEval(false));
+        }
         processFunction.args.stream().forEach(a -> a.closeEvalToBlock(builder));
         return builder.build();
     }
 
     private String invokeRealEval(boolean blockStyle) {
         StringBuilder builder = new StringBuilder("return eval(page.getPositionCount()");
+
         String params = processFunction.args.stream()
             .map(a -> a.paramName(blockStyle))
             .filter(a -> a != null)
@@ -154,6 +172,7 @@ public class EvaluatorImplementer {
                 builder.addParameter(a.dataType(blockStyle), a.paramName(blockStyle));
             }
         });
+
         TypeName builderType = builderType(resultDataType);
         builder.beginControlFlow(
             "try($T result = driverContext.blockFactory().$L(positionCount))",
@@ -166,13 +185,36 @@ public class EvaluatorImplementer {
             builder.beginControlFlow("position: for (int p = 0; p < positionCount; p++)");
             {
                 if (blockStyle) {
-                    processFunction.args.stream().forEach(a -> a.skipNull(builder));
+                    if (processOutputsMultivalued == false) {
+                        processFunction.args.stream().forEach(a -> a.skipNull(builder));
+                    } else {
+                        builder.addStatement("boolean allBlocksAreNulls = true");
+                        // allow block type inputs to be null
+                        processFunction.args.stream().forEach(a -> {
+                            if (a instanceof StandardProcessFunctionArg as) {
+                                as.skipNull(builder);
+                            } else if (a instanceof BlockProcessFunctionArg ab) {
+                                builder.beginControlFlow("if (!$N.isNull(p))", ab.paramName(blockStyle));
+                                {
+                                    builder.addStatement("allBlocksAreNulls = false");
+                                }
+                                builder.endControlFlow();
+                            }
+                        });
+
+                        builder.beginControlFlow("if (allBlocksAreNulls)");
+                        {
+                            builder.addStatement("result.appendNull()");
+                            builder.addStatement("continue position");
+                        }
+                        builder.endControlFlow();
+                    }
                 }
                 processFunction.args.stream().forEach(a -> a.unpackValues(builder, blockStyle));
 
                 StringBuilder pattern = new StringBuilder();
                 List<Object> args = new ArrayList<>();
-                pattern.append("$T.$N(");
+                pattern.append(processOutputsMultivalued ? "$T.$N(result, p, " : "$T.$N(");
                 args.add(declarationType);
                 args.add(processFunction.function.getSimpleName());
                 processFunction.args.stream().forEach(a -> {
@@ -189,11 +231,12 @@ public class EvaluatorImplementer {
                 } else {
                     builtPattern = pattern.toString();
                 }
-
                 if (processFunction.warnExceptions.isEmpty() == false) {
                     builder.beginControlFlow("try");
                 }
+
                 builder.addStatement(builtPattern, args.toArray());
+
                 if (processFunction.warnExceptions.isEmpty() == false) {
                     String catchPattern = "catch ("
                         + processFunction.warnExceptions.stream().map(m -> "$T").collect(Collectors.joining(" | "))
@@ -403,7 +446,7 @@ public class EvaluatorImplementer {
         @Override
         public TypeName dataType(boolean blockStyle) {
             if (blockStyle) {
-                return blockType(type);
+                return isBlockType() ? type : blockType(type);
             }
             return vectorType(type);
         }
@@ -442,7 +485,7 @@ public class EvaluatorImplementer {
 
         @Override
         public void evalToBlock(MethodSpec.Builder builder) {
-            TypeName blockType = blockType(type);
+            TypeName blockType = isBlockType() ? type : blockType(type);
             builder.beginControlFlow("try ($T $LBlock = ($T) $L.eval(page))", blockType, name, blockType, name);
         }
 
@@ -474,6 +517,10 @@ public class EvaluatorImplementer {
             // nothing to do
         }
 
+        private boolean isBlockType() {
+            return EvaluatorImplementer.isBlockType(type);
+        }
+
         @Override
         public void buildInvocation(StringBuilder pattern, List<Object> args, boolean blockStyle) {
             if (type.equals(BYTES_REF)) {
@@ -488,14 +535,21 @@ public class EvaluatorImplementer {
                 return;
             }
             if (blockStyle) {
-                pattern.append("$L.$L($L.getFirstValueIndex(p))");
+                if (isBlockType()) {
+                    pattern.append("$L");
+                } else {
+                    pattern.append("$L.$L($L.getFirstValueIndex(p))");
+                }
             } else {
                 pattern.append("$L.$L(p)");
             }
             args.add(paramName(blockStyle));
-            args.add(getMethod(type));
-            if (blockStyle) {
-                args.add(paramName(true));
+            String method = isBlockType() ? null : getMethod(type);
+            if (method != null) {
+                args.add(method);
+                if (blockStyle) {
+                    args.add(paramName(true));
+                }
             }
         }
 
@@ -824,12 +878,101 @@ public class EvaluatorImplementer {
         }
     }
 
+    private record BlockProcessFunctionArg(TypeName type, String name) implements ProcessFunctionArg {
+        @Override
+        public TypeName dataType(boolean blockStyle) {
+            return type;
+        }
+
+        @Override
+        public String paramName(boolean blockStyle) {
+            return name + (blockStyle ? "Block" : "Vector");
+        }
+
+        @Override
+        public void declareField(TypeSpec.Builder builder) {
+            builder.addField(EXPRESSION_EVALUATOR, name, Modifier.PRIVATE, Modifier.FINAL);
+        }
+
+        @Override
+        public void declareFactoryField(TypeSpec.Builder builder) {
+            builder.addField(EXPRESSION_EVALUATOR_FACTORY, name, Modifier.PRIVATE, Modifier.FINAL);
+        }
+
+        @Override
+        public void implementCtor(MethodSpec.Builder builder) {
+            builder.addParameter(EXPRESSION_EVALUATOR, name);
+            builder.addStatement("this.$L = $L", name, name);
+        }
+
+        @Override
+        public void implementFactoryCtor(MethodSpec.Builder builder) {
+            builder.addParameter(EXPRESSION_EVALUATOR_FACTORY, name);
+            builder.addStatement("this.$L = $L", name, name);
+        }
+
+        @Override
+        public String factoryInvocation(MethodSpec.Builder factoryMethodBuilder) {
+            return name + ".get(context)";
+        }
+
+        @Override
+        public void evalToBlock(MethodSpec.Builder builder) {
+            builder.beginControlFlow("try ($T $LBlock = ($T) $L.eval(page))", type, name, type, name);
+        }
+
+        @Override
+        public void closeEvalToBlock(MethodSpec.Builder builder) {
+            builder.endControlFlow();
+        }
+
+        @Override
+        public void resolveVectors(MethodSpec.Builder builder, String invokeBlockEval) {
+            // nothing to do
+        }
+
+        @Override
+        public void createScratch(MethodSpec.Builder builder) {
+            // nothing to do
+        }
+
+        @Override
+        public void skipNull(MethodSpec.Builder builder) {
+            EvaluatorImplementer.skipNull(builder, paramName(true));
+        }
+
+        @Override
+        public void unpackValues(MethodSpec.Builder builder, boolean blockStyle) {
+            // nothing to do
+        }
+
+        @Override
+        public void buildInvocation(StringBuilder pattern, List<Object> args, boolean blockStyle) {
+            pattern.append("$L");
+            args.add(paramName(blockStyle));
+        }
+
+        @Override
+        public void buildToStringInvocation(StringBuilder pattern, List<Object> args, String prefix) {
+            pattern.append(" + $S + $L");
+            args.add(prefix + name + "=");
+            args.add(name);
+        }
+
+        @Override
+        public String closeInvocation() {
+            return name;
+        }
+    }
+
     private static class ProcessFunction {
         private final ExecutableElement function;
         private final List<ProcessFunctionArg> args;
         private final BuilderProcessFunctionArg builderArg;
         private final List<TypeMirror> warnExceptions;
 
+        private boolean hasBlockType;
+
         private ProcessFunction(
             Elements elements,
             javax.lang.model.util.Types types,
@@ -839,6 +982,7 @@ public class EvaluatorImplementer {
             this.function = function;
             args = new ArrayList<>();
             BuilderProcessFunctionArg builderArg = null;
+            hasBlockType = false;
             for (VariableElement v : function.getParameters()) {
                 TypeName type = TypeName.get(v.asType());
                 String name = v.getSimpleName().toString();
@@ -871,6 +1015,14 @@ public class EvaluatorImplementer {
                     args.add(new ArrayProcessFunctionArg(TypeName.get(componentType), name));
                     continue;
                 }
+                if (isBlockType(type)) {
+                    if (builderArg != null && args.size() == 2 && hasBlockType == false) {
+                        args.clear();
+                        hasBlockType = true;
+                    }
+                    args.add(new BlockProcessFunctionArg(type, name));
+                    continue;
+                }
                 args.add(new StandardProcessFunctionArg(type, name));
             }
             this.builderArg = builderArg;
@@ -885,4 +1037,12 @@ public class EvaluatorImplementer {
             return useBlockStyle ? blockType(TypeName.get(function.getReturnType())) : vectorType(TypeName.get(function.getReturnType()));
         }
     }
+
+    static boolean isBlockType(TypeName type) {
+        return type.equals(INT_BLOCK)
+            || type.equals(LONG_BLOCK)
+            || type.equals(DOUBLE_BLOCK)
+            || type.equals(BOOLEAN_BLOCK)
+            || type.equals(BYTES_REF_BLOCK);
+    }
 }

+ 23 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/boolean.csv-spec

@@ -232,3 +232,26 @@ emp_no:integer |languages:integer |byte2bool:boolean |short2bool:boolean
 10020          |null              |null              |null
 10030          |3                 |true              |true
 ;
+
+mvSlice#[skip:-8.13.99, reason:newly added in 8.14]
+row a = [true, false, false, true]
+| eval a1 = mv_slice(a, 1), a2 = mv_slice(a, 2, 3);
+
+a:boolean                  | a1:boolean | a2:boolean
+[true, false, false, true] | false      | [false, true]
+;
+
+mvSliceEmp#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(is_rehired, 0)
+| keep emp_no, is_rehired, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | is_rehired:boolean       | a1:boolean
+10001          | [false,true]             | false
+10002          | [false,false]            | false
+10003          | null                     | null
+10004          | true                     | true
+10005          | [false,false,false,true] | false
+;

+ 15 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec

@@ -224,6 +224,21 @@ row a = [1.1, 2.1, 2.1] | eval da = mv_dedupe(a);
 [1.1, 2.1, 2.1] | [1.1, 2.1]
 ;
 
+mvSliceEmp#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change, 0, 1)
+| keep emp_no, salary_change, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change:double    | a1:double
+10001          | 1.19                    | 1.19
+10002          | [-7.23,11.17]           | [-7.23,11.17]
+10003          | [12.82,14.68]           | [12.82,14.68]
+10004          | [-0.35,1.13,3.65,13.48] | [-0.35, 1.13]
+10005          | [-2.14,13.07]           | [-2.14,13.07]
+;
+
 autoBucket
 FROM employees
 | WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1986-01-01T00:00:00Z"

+ 145 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec

@@ -384,6 +384,151 @@ row a = [1, 2, 2, 3] | eval da = mv_dedupe(a);
 [1, 2, 2, 3] | [1, 2, 3]
 ;
 
+mvSlice#[skip:-8.13.99, reason:newly added in 8.14]
+// tag::mv_slice_positive[]
+row a = [1, 2, 2, 3]
+| eval a1 = mv_slice(a, 1), a2 = mv_slice(a, 2, 3)
+// end::mv_slice_positive[]
+;
+// tag::mv_slice_positive-result[]
+a:integer    | a1:integer | a2:integer
+[1, 2, 2, 3] | 2          | [2, 3]
+// end::mv_slice_positive-result[]
+;
+
+mvSliceNegativeOffset#[skip:-8.13.99, reason:newly added in 8.14]
+// tag::mv_slice_negative[]
+row a = [1, 2, 2, 3]
+| eval a1 = mv_slice(a, -2), a2 = mv_slice(a, -3, -1)
+// end::mv_slice_negative[]
+;
+// tag::mv_slice_negative-result[]
+a:integer    | a1:integer | a2:integer
+[1, 2, 2, 3] | 2          | [2, 2, 3]
+// end::mv_slice_negative-result[]
+;
+
+mvSliceSingle#[skip:-8.13.99, reason:newly added in 8.14]
+row a = 1
+| eval a1 = mv_slice(a, 0);
+
+a:integer | a1:integer
+1         | 1
+;
+
+mvSliceOutOfBound#[skip:-8.13.99, reason:newly added in 8.14]
+row a = [1, 2, 2, 3]
+| eval a1 = mv_slice(a, 4), a2 = mv_slice(a, 2, 6), a3 = mv_slice(a, 4, 6);
+
+a:integer    | a1:integer | a2:integer | a3:integer
+[1, 2, 2, 3] | null          | [2, 3]     | null
+;
+
+mvSliceEmpInt#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.int, 0, 1)
+| keep emp_no, salary_change.int, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.int:integer | a1:integer
+10001          | 1                         | 1
+10002          | [-7, 11]                  | [-7, 11]
+10003          | [12, 14]                  | [12, 14]
+10004          | [0, 1, 3, 13]             | [0, 1]
+10005          | [-2, 13]                  | [-2, 13]
+;
+
+mvSliceEmpIntSingle#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.int, 1)
+| keep emp_no, salary_change.int, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.int:integer | a1:integer
+10001          | 1                         | null
+10002          | [-7, 11]                  | 11
+10003          | [12, 14]                  | 14
+10004          | [0, 1, 3, 13]             | 1
+10005          | [-2, 13]                  | 13
+;
+
+mvSliceEmpIntEndOutOfBound#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.int, 1, 4)
+| keep emp_no, salary_change.int, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.int:integer | a1:integer
+10001          | 1                         | null
+10002          | [-7, 11]                  | 11
+10003          | [12, 14]                  | 14
+10004          | [0, 1, 3, 13]             | [1, 3, 13]
+10005          | [-2, 13]                  | 13
+;
+
+mvSliceEmpIntOutOfBound#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.int, 2, 4)
+| keep emp_no, salary_change.int, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.int:integer | a1:integer
+10001          | 1                         | null
+10002          | [-7, 11]                  | null
+10003          | [12, 14]                  | null
+10004          | [0, 1, 3, 13]             | [3, 13]
+10005          | [-2, 13]                  | null
+;
+
+mvSliceEmpIntStartOutOfBoundNegative#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.int, -5, -2)
+| keep emp_no, salary_change.int, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.int:integer | a1:integer
+10001          | 1                         | null
+10002          | [-7, 11]                  | -7
+10003          | [12, 14]                  | 12
+10004          | [0, 1, 3, 13]             | [0, 1, 3]
+10005          | [-2, 13]                  | -2
+;
+
+mvSliceEmpIntOutOfBoundNegative#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.int, -5, -3)
+| keep emp_no, salary_change.int, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.int:integer | a1:integer
+10001          | 1                         | null
+10002          | [-7, 11]                  | null
+10003          | [12, 14]                  | null
+10004          | [0, 1, 3, 13]             | [0, 1]
+10005          | [-2, 13]                  | null
+;
+
+mvSliceEmpLong#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.long, 0, 1)
+| keep emp_no, salary_change.long, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.long:long | a1:long
+10001          | 1                       | 1
+10002          | [-7, 11]                | [-7, 11]
+10003          | [12, 14]                | [12, 14]
+10004          | [0, 1, 3, 13]           | [0, 1]
+10005          | [-2, 13]                | [-2, 13]
+;
+
 autoBucket
 // tag::auto_bucket[]
 FROM employees

+ 44 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec

@@ -277,3 +277,47 @@ lo0            |fe81::cae2:65ff:fece:feb9
 eth0           |127.0.0.3
 eth0           |fe80::cae2:65ff:fece:fec1
 ;
+
+mvSlice#[skip:-8.13.99, reason:newly added in 8.14]
+from hosts
+| where host == "epsilon"
+| eval a1 = mv_slice(ip1, 0, 1)
+| keep host, ip1, a1
+| sort host, ip1
+| limit 5;
+
+host:keyword | ip1:ip                                                 | a1:ip
+epsilon      | [127.0.0.1, 127.0.0.2, 127.0.0.3]                      | [127.0.0.1, 127.0.0.2]
+epsilon      | fe80::cae2:65ff:fece:fec1                              | fe80::cae2:65ff:fece:fec1
+epsilon      | [fe81::cae2:65ff:fece:feb9, fe82::cae2:65ff:fece:fec0] | [fe81::cae2:65ff:fece:feb9, fe82::cae2:65ff:fece:fec0]
+;
+
+mvSlice#[skip:-8.13.99, reason:newly added in 8.14]
+from hosts
+| where host == "epsilon"
+| eval a1 = mv_slice(ip1, 0, 1)
+| keep host, ip1, a1
+| sort host, ip1
+| limit 5;
+
+host:keyword | ip1:ip                                                 | a1:ip
+epsilon      | [127.0.0.1, 127.0.0.2, 127.0.0.3]                      | [127.0.0.1, 127.0.0.2]
+epsilon      | fe80::cae2:65ff:fece:fec1                              | fe80::cae2:65ff:fece:fec1
+epsilon      | [fe81::cae2:65ff:fece:feb9, fe82::cae2:65ff:fece:fec0] | [fe81::cae2:65ff:fece:feb9, fe82::cae2:65ff:fece:fec0]
+;
+
+mvZip#[skip:-8.13.99, reason:newly added in 8.14]
+from hosts
+| eval zip = mv_zip(to_string(description), to_string(ip0), "@@")
+| keep host, description, ip0, zip
+| sort host desc, ip0
+| limit 5
+;
+
+host:keyword | description:text              | ip0:ip                                                                            | zip:keyword
+gamma        | gamma k8s server              | fe80::cae2:65ff:fece:feb9                                                         | gamma k8s server@@fe80::cae2:65ff:fece:feb9
+gamma        | gamma k8s server              | fe80::cae2:65ff:fece:feb9                                                         | gamma k8s server@@fe80::cae2:65ff:fece:feb9
+epsilon      | epsilon gw instance           | [fe80::cae2:65ff:fece:feb9, fe80::cae2:65ff:fece:fec0, fe80::cae2:65ff:fece:fec1] | [epsilon gw instance@@fe80::cae2:65ff:fece:feb9, fe80::cae2:65ff:fece:fec0, fe80::cae2:65ff:fece:fec1]
+epsilon      | [epsilon host, epsilon2 host] | [fe81::cae2:65ff:fece:feb9, fe82::cae2:65ff:fece:fec0]                            | [epsilon host@@fe81::cae2:65ff:fece:feb9, epsilon2 host@@fe82::cae2:65ff:fece:fec0]
+epsilon      | null                          | null                                                                              | null
+;

+ 5 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec

@@ -54,7 +54,9 @@ mv_last                  |"boolean|cartesian_point|cartesian_shape|date|double|g
 mv_max                   |"boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version mv_max(v:boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version)" |v | "boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version" | "" |"boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version"      | "Reduce a multivalued field to a single valued field containing the maximum value." | false                | false | false
 mv_median                |"double|integer|long|unsigned_long mv_median(v:double|integer|long|unsigned_long)"                                     |v                     |"double|integer|long|unsigned_long"                 | ""                                                 |"double|integer|long|unsigned_long"                    | "Converts a multivalued field into a single valued field containing the median value."                      | false                | false | false
 mv_min                   |"boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version mv_min(v:boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version)" |v | "boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version" | "" |"boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version"      | "Reduce a multivalued field to a single valued field containing the minimum value." | false                | false | false
+mv_slice                 |"boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|version mv_slice(v:boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|version, start:integer, ?end:integer)" |[v, start, end] | "[boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|version, integer, integer]" | "[A multivalued field, start index, end index (included)]" |"boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|version"   | "Returns a subset of the multivalued field using the start and end index values."                      | [false, false, true]                | false | false
 mv_sum                   |"double|integer|long|unsigned_long mv_sum(v:double|integer|long|unsigned_long)"                                        |v                     |"double|integer|long|unsigned_long"                 | ""                                                 |"double|integer|long|unsigned_long"                    | "Converts a multivalued field into a single valued field containing the sum of all of the values."                      | false                | false | false
+mv_zip                   |"keyword mv_zip(mvLeft:keyword|text, mvRight:keyword|text, ?delim:keyword|text)"          |[mvLeft, mvRight, delim] | ["keyword|text", "keyword|text", "keyword|text"] | [A multivalued field, A multivalued field, delimiter] | "keyword" | "Combines the values from two multivalued fields with a delimiter that joins them together." | [false, false, true] | false | false
 now                      |date now()                                                 | null                    |null              | null                                               |date                    | "Returns current date and time."                      | null                 | false | false
 percentile               |"double|integer|long percentile(field:double|integer|long, percentile:double|integer|long)"                            |[field, percentile]             |["double|integer|long, double|integer|long"]            |["", ""]                                            |"double|integer|long"                    | "The value at which a certain percentage of observed values occur."                      | [false, false]       | false | true
 pi                       |double pi()                                                  | null                    |  null            | null                                               |double                    | "The ratio of a circle’s circumference to its diameter."                      | null                 | false | false
@@ -153,7 +155,9 @@ double e()
 "boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version mv_max(v:boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version)"
 "double|integer|long|unsigned_long mv_median(v:double|integer|long|unsigned_long)"
 "boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version mv_min(v:boolean|date|double|integer|ip|keyword|long|text|unsigned_long|version)"
+"boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|version mv_slice(v:boolean|cartesian_point|cartesian_shape|date|double|geo_point|geo_shape|integer|ip|keyword|long|text|version, start:integer, ?end:integer)"
 "double|integer|long|unsigned_long mv_sum(v:double|integer|long|unsigned_long)"
+"keyword mv_zip(mvLeft:keyword|text, mvRight:keyword|text, ?delim:keyword|text)"
 date now()
 "double|integer|long percentile(field:double|integer|long, percentile:double|integer|long)"
 double pi()
@@ -224,5 +228,5 @@ countFunctions#[skip:-8.13.99]
 show functions |  stats  a = count(*), b = count(*), c = count(*) |  mv_expand c;
 
 a:long | b:long | c:long
-92     | 92     | 92
+94     | 94     | 94
 ;

+ 44 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/string.csv-spec

@@ -696,6 +696,50 @@ ROW a=[10, 9, 8]
 // end::mv_concat-to_string-result[]
 ;
 
+mvSliceEmp#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval a1 = mv_slice(salary_change.keyword, 0, 1)
+| keep emp_no, salary_change.keyword, a1
+| sort emp_no
+| limit 5;
+
+emp_no:integer | salary_change.keyword:keyword | a1:keyword
+10001          | 1.19                          | 1.19
+10002          | [-7.23,11.17]                 | [-7.23,11.17]
+10003          | [12.82,14.68]                 | [12.82,14.68]
+10004          | [-0.35,1.13,13.48,3.65]       | [-0.35,1.13]
+10005          | [-2.14,13.07]                 | [-2.14,13.07]
+;
+
+mvZip#[skip:-8.13.99, reason:newly added in 8.14]
+// tag::mv_zip[]
+ROW a = ["x", "y", "z"], b = ["1", "2"]
+| EVAL c = mv_zip(a, b, "-")
+| KEEP a, b, c
+// end::mv_zip[]
+;
+
+// tag::mv_zip-result[]
+a:keyword | b:keyword | c:keyword
+[x, y, z] | [1 ,2]    | [x-1, y-2, z]
+// end::mv_zip-result[]
+;
+
+mvZipEmp#[skip:-8.13.99, reason:newly added in 8.14]
+from employees
+| eval full_name = mv_zip(first_name, last_name, " "), full_name_2 = mv_zip(last_name, first_name), jobs = mv_zip(job_positions, salary_change.keyword, "#")
+| keep emp_no, full_name, full_name_2, job_positions, salary_change.keyword, jobs
+| sort emp_no
+| limit 5;
+
+emp_no:integer | full_name:keyword | full_name_2:keyword | job_positions:keyword                                                  | salary_change.keyword:keyword | jobs:keyword
+10001          | Georgi Facello    | Facello,Georgi      | [Accountant, Senior Python Developer]                                  | 1.19                          | [Accountant#1.19, Senior Python Developer]
+10002          | Bezalel Simmel    | Simmel,Bezalel      | Senior Team Lead                                                       | [-7.23,11.17]                 | [Senior Team Lead#-7.23, 11.17]
+10003          | Parto Bamford     | Bamford,Parto       | null                                                                   | [12.82, 14.68]                | [12.82, 14.68]
+10004          | Chirstian Koblick | Koblick,Chirstian   | [Head Human Resources, Reporting Analyst, Support Engineer, Tech Lead] | [-0.35, 1.13, 13.48, 3.65]    | [Head Human Resources#-0.35, Reporting Analyst#1.13, Support Engineer#13.48, Tech Lead#3.65]
+10005          | Kyoichi Maliniak  | Maliniak,Kyoichi    | null                                                                   | [-2.14,13.07]                 | [-2.14,13.07]
+;
+
 showTextFields
 from hosts | where host == "beta" | keep host, host_group, description;
 ignoreOrder:true

+ 140 - 0
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceBooleanEvaluator.java

@@ -0,0 +1,140 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import java.lang.IllegalArgumentException;
+import java.lang.Override;
+import java.lang.String;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.expression.function.Warnings;
+import org.elasticsearch.xpack.ql.InvalidArgumentException;
+import org.elasticsearch.xpack.ql.tree.Source;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link MvSlice}.
+ * This class is generated. Do not edit it.
+ */
+public final class MvSliceBooleanEvaluator implements EvalOperator.ExpressionEvaluator {
+  private final Warnings warnings;
+
+  private final EvalOperator.ExpressionEvaluator field;
+
+  private final EvalOperator.ExpressionEvaluator start;
+
+  private final EvalOperator.ExpressionEvaluator end;
+
+  private final DriverContext driverContext;
+
+  public MvSliceBooleanEvaluator(Source source, EvalOperator.ExpressionEvaluator field,
+      EvalOperator.ExpressionEvaluator start, EvalOperator.ExpressionEvaluator end,
+      DriverContext driverContext) {
+    this.warnings = new Warnings(source);
+    this.field = field;
+    this.start = start;
+    this.end = end;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public Block eval(Page page) {
+    try (BooleanBlock fieldBlock = (BooleanBlock) field.eval(page)) {
+      try (IntBlock startBlock = (IntBlock) start.eval(page)) {
+        try (IntBlock endBlock = (IntBlock) end.eval(page)) {
+          return eval(page.getPositionCount(), fieldBlock, startBlock, endBlock);
+        }
+      }
+    }
+  }
+
+  public BooleanBlock eval(int positionCount, BooleanBlock fieldBlock, IntBlock startBlock,
+      IntBlock endBlock) {
+    try(BooleanBlock.Builder result = driverContext.blockFactory().newBooleanBlockBuilder(positionCount)) {
+      position: for (int p = 0; p < positionCount; p++) {
+        boolean allBlocksAreNulls = true;
+        if (!fieldBlock.isNull(p)) {
+          allBlocksAreNulls = false;
+        }
+        if (startBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (startBlock.getValueCount(p) != 1) {
+          if (startBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.getValueCount(p) != 1) {
+          if (endBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (allBlocksAreNulls) {
+          result.appendNull();
+          continue position;
+        }
+        try {
+          MvSlice.process(result, p, fieldBlock, startBlock.getInt(startBlock.getFirstValueIndex(p)), endBlock.getInt(endBlock.getFirstValueIndex(p)));
+        } catch (InvalidArgumentException e) {
+          warnings.registerException(e);
+          result.appendNull();
+        }
+      }
+      return result.build();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MvSliceBooleanEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+  }
+
+  @Override
+  public void close() {
+    Releasables.closeExpectNoException(field, start, end);
+  }
+
+  static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
+    private final Source source;
+
+    private final EvalOperator.ExpressionEvaluator.Factory field;
+
+    private final EvalOperator.ExpressionEvaluator.Factory start;
+
+    private final EvalOperator.ExpressionEvaluator.Factory end;
+
+    public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory field,
+        EvalOperator.ExpressionEvaluator.Factory start,
+        EvalOperator.ExpressionEvaluator.Factory end) {
+      this.source = source;
+      this.field = field;
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public MvSliceBooleanEvaluator get(DriverContext context) {
+      return new MvSliceBooleanEvaluator(source, field.get(context), start.get(context), end.get(context), context);
+    }
+
+    @Override
+    public String toString() {
+      return "MvSliceBooleanEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+    }
+  }
+}

+ 140 - 0
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceBytesRefEvaluator.java

@@ -0,0 +1,140 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import java.lang.IllegalArgumentException;
+import java.lang.Override;
+import java.lang.String;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.expression.function.Warnings;
+import org.elasticsearch.xpack.ql.InvalidArgumentException;
+import org.elasticsearch.xpack.ql.tree.Source;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link MvSlice}.
+ * This class is generated. Do not edit it.
+ */
+public final class MvSliceBytesRefEvaluator implements EvalOperator.ExpressionEvaluator {
+  private final Warnings warnings;
+
+  private final EvalOperator.ExpressionEvaluator field;
+
+  private final EvalOperator.ExpressionEvaluator start;
+
+  private final EvalOperator.ExpressionEvaluator end;
+
+  private final DriverContext driverContext;
+
+  public MvSliceBytesRefEvaluator(Source source, EvalOperator.ExpressionEvaluator field,
+      EvalOperator.ExpressionEvaluator start, EvalOperator.ExpressionEvaluator end,
+      DriverContext driverContext) {
+    this.warnings = new Warnings(source);
+    this.field = field;
+    this.start = start;
+    this.end = end;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public Block eval(Page page) {
+    try (BytesRefBlock fieldBlock = (BytesRefBlock) field.eval(page)) {
+      try (IntBlock startBlock = (IntBlock) start.eval(page)) {
+        try (IntBlock endBlock = (IntBlock) end.eval(page)) {
+          return eval(page.getPositionCount(), fieldBlock, startBlock, endBlock);
+        }
+      }
+    }
+  }
+
+  public BytesRefBlock eval(int positionCount, BytesRefBlock fieldBlock, IntBlock startBlock,
+      IntBlock endBlock) {
+    try(BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
+      position: for (int p = 0; p < positionCount; p++) {
+        boolean allBlocksAreNulls = true;
+        if (!fieldBlock.isNull(p)) {
+          allBlocksAreNulls = false;
+        }
+        if (startBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (startBlock.getValueCount(p) != 1) {
+          if (startBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.getValueCount(p) != 1) {
+          if (endBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (allBlocksAreNulls) {
+          result.appendNull();
+          continue position;
+        }
+        try {
+          MvSlice.process(result, p, fieldBlock, startBlock.getInt(startBlock.getFirstValueIndex(p)), endBlock.getInt(endBlock.getFirstValueIndex(p)));
+        } catch (InvalidArgumentException e) {
+          warnings.registerException(e);
+          result.appendNull();
+        }
+      }
+      return result.build();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MvSliceBytesRefEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+  }
+
+  @Override
+  public void close() {
+    Releasables.closeExpectNoException(field, start, end);
+  }
+
+  static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
+    private final Source source;
+
+    private final EvalOperator.ExpressionEvaluator.Factory field;
+
+    private final EvalOperator.ExpressionEvaluator.Factory start;
+
+    private final EvalOperator.ExpressionEvaluator.Factory end;
+
+    public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory field,
+        EvalOperator.ExpressionEvaluator.Factory start,
+        EvalOperator.ExpressionEvaluator.Factory end) {
+      this.source = source;
+      this.field = field;
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public MvSliceBytesRefEvaluator get(DriverContext context) {
+      return new MvSliceBytesRefEvaluator(source, field.get(context), start.get(context), end.get(context), context);
+    }
+
+    @Override
+    public String toString() {
+      return "MvSliceBytesRefEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+    }
+  }
+}

+ 140 - 0
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceDoubleEvaluator.java

@@ -0,0 +1,140 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import java.lang.IllegalArgumentException;
+import java.lang.Override;
+import java.lang.String;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.expression.function.Warnings;
+import org.elasticsearch.xpack.ql.InvalidArgumentException;
+import org.elasticsearch.xpack.ql.tree.Source;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link MvSlice}.
+ * This class is generated. Do not edit it.
+ */
+public final class MvSliceDoubleEvaluator implements EvalOperator.ExpressionEvaluator {
+  private final Warnings warnings;
+
+  private final EvalOperator.ExpressionEvaluator field;
+
+  private final EvalOperator.ExpressionEvaluator start;
+
+  private final EvalOperator.ExpressionEvaluator end;
+
+  private final DriverContext driverContext;
+
+  public MvSliceDoubleEvaluator(Source source, EvalOperator.ExpressionEvaluator field,
+      EvalOperator.ExpressionEvaluator start, EvalOperator.ExpressionEvaluator end,
+      DriverContext driverContext) {
+    this.warnings = new Warnings(source);
+    this.field = field;
+    this.start = start;
+    this.end = end;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public Block eval(Page page) {
+    try (DoubleBlock fieldBlock = (DoubleBlock) field.eval(page)) {
+      try (IntBlock startBlock = (IntBlock) start.eval(page)) {
+        try (IntBlock endBlock = (IntBlock) end.eval(page)) {
+          return eval(page.getPositionCount(), fieldBlock, startBlock, endBlock);
+        }
+      }
+    }
+  }
+
+  public DoubleBlock eval(int positionCount, DoubleBlock fieldBlock, IntBlock startBlock,
+      IntBlock endBlock) {
+    try(DoubleBlock.Builder result = driverContext.blockFactory().newDoubleBlockBuilder(positionCount)) {
+      position: for (int p = 0; p < positionCount; p++) {
+        boolean allBlocksAreNulls = true;
+        if (!fieldBlock.isNull(p)) {
+          allBlocksAreNulls = false;
+        }
+        if (startBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (startBlock.getValueCount(p) != 1) {
+          if (startBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.getValueCount(p) != 1) {
+          if (endBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (allBlocksAreNulls) {
+          result.appendNull();
+          continue position;
+        }
+        try {
+          MvSlice.process(result, p, fieldBlock, startBlock.getInt(startBlock.getFirstValueIndex(p)), endBlock.getInt(endBlock.getFirstValueIndex(p)));
+        } catch (InvalidArgumentException e) {
+          warnings.registerException(e);
+          result.appendNull();
+        }
+      }
+      return result.build();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MvSliceDoubleEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+  }
+
+  @Override
+  public void close() {
+    Releasables.closeExpectNoException(field, start, end);
+  }
+
+  static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
+    private final Source source;
+
+    private final EvalOperator.ExpressionEvaluator.Factory field;
+
+    private final EvalOperator.ExpressionEvaluator.Factory start;
+
+    private final EvalOperator.ExpressionEvaluator.Factory end;
+
+    public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory field,
+        EvalOperator.ExpressionEvaluator.Factory start,
+        EvalOperator.ExpressionEvaluator.Factory end) {
+      this.source = source;
+      this.field = field;
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public MvSliceDoubleEvaluator get(DriverContext context) {
+      return new MvSliceDoubleEvaluator(source, field.get(context), start.get(context), end.get(context), context);
+    }
+
+    @Override
+    public String toString() {
+      return "MvSliceDoubleEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+    }
+  }
+}

+ 139 - 0
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceIntEvaluator.java

@@ -0,0 +1,139 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import java.lang.IllegalArgumentException;
+import java.lang.Override;
+import java.lang.String;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.expression.function.Warnings;
+import org.elasticsearch.xpack.ql.InvalidArgumentException;
+import org.elasticsearch.xpack.ql.tree.Source;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link MvSlice}.
+ * This class is generated. Do not edit it.
+ */
+public final class MvSliceIntEvaluator implements EvalOperator.ExpressionEvaluator {
+  private final Warnings warnings;
+
+  private final EvalOperator.ExpressionEvaluator field;
+
+  private final EvalOperator.ExpressionEvaluator start;
+
+  private final EvalOperator.ExpressionEvaluator end;
+
+  private final DriverContext driverContext;
+
+  public MvSliceIntEvaluator(Source source, EvalOperator.ExpressionEvaluator field,
+      EvalOperator.ExpressionEvaluator start, EvalOperator.ExpressionEvaluator end,
+      DriverContext driverContext) {
+    this.warnings = new Warnings(source);
+    this.field = field;
+    this.start = start;
+    this.end = end;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public Block eval(Page page) {
+    try (IntBlock fieldBlock = (IntBlock) field.eval(page)) {
+      try (IntBlock startBlock = (IntBlock) start.eval(page)) {
+        try (IntBlock endBlock = (IntBlock) end.eval(page)) {
+          return eval(page.getPositionCount(), fieldBlock, startBlock, endBlock);
+        }
+      }
+    }
+  }
+
+  public IntBlock eval(int positionCount, IntBlock fieldBlock, IntBlock startBlock,
+      IntBlock endBlock) {
+    try(IntBlock.Builder result = driverContext.blockFactory().newIntBlockBuilder(positionCount)) {
+      position: for (int p = 0; p < positionCount; p++) {
+        boolean allBlocksAreNulls = true;
+        if (!fieldBlock.isNull(p)) {
+          allBlocksAreNulls = false;
+        }
+        if (startBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (startBlock.getValueCount(p) != 1) {
+          if (startBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.getValueCount(p) != 1) {
+          if (endBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (allBlocksAreNulls) {
+          result.appendNull();
+          continue position;
+        }
+        try {
+          MvSlice.process(result, p, fieldBlock, startBlock.getInt(startBlock.getFirstValueIndex(p)), endBlock.getInt(endBlock.getFirstValueIndex(p)));
+        } catch (InvalidArgumentException e) {
+          warnings.registerException(e);
+          result.appendNull();
+        }
+      }
+      return result.build();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MvSliceIntEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+  }
+
+  @Override
+  public void close() {
+    Releasables.closeExpectNoException(field, start, end);
+  }
+
+  static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
+    private final Source source;
+
+    private final EvalOperator.ExpressionEvaluator.Factory field;
+
+    private final EvalOperator.ExpressionEvaluator.Factory start;
+
+    private final EvalOperator.ExpressionEvaluator.Factory end;
+
+    public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory field,
+        EvalOperator.ExpressionEvaluator.Factory start,
+        EvalOperator.ExpressionEvaluator.Factory end) {
+      this.source = source;
+      this.field = field;
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public MvSliceIntEvaluator get(DriverContext context) {
+      return new MvSliceIntEvaluator(source, field.get(context), start.get(context), end.get(context), context);
+    }
+
+    @Override
+    public String toString() {
+      return "MvSliceIntEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+    }
+  }
+}

+ 140 - 0
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceLongEvaluator.java

@@ -0,0 +1,140 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import java.lang.IllegalArgumentException;
+import java.lang.Override;
+import java.lang.String;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.expression.function.Warnings;
+import org.elasticsearch.xpack.ql.InvalidArgumentException;
+import org.elasticsearch.xpack.ql.tree.Source;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link MvSlice}.
+ * This class is generated. Do not edit it.
+ */
+public final class MvSliceLongEvaluator implements EvalOperator.ExpressionEvaluator {
+  private final Warnings warnings;
+
+  private final EvalOperator.ExpressionEvaluator field;
+
+  private final EvalOperator.ExpressionEvaluator start;
+
+  private final EvalOperator.ExpressionEvaluator end;
+
+  private final DriverContext driverContext;
+
+  public MvSliceLongEvaluator(Source source, EvalOperator.ExpressionEvaluator field,
+      EvalOperator.ExpressionEvaluator start, EvalOperator.ExpressionEvaluator end,
+      DriverContext driverContext) {
+    this.warnings = new Warnings(source);
+    this.field = field;
+    this.start = start;
+    this.end = end;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public Block eval(Page page) {
+    try (LongBlock fieldBlock = (LongBlock) field.eval(page)) {
+      try (IntBlock startBlock = (IntBlock) start.eval(page)) {
+        try (IntBlock endBlock = (IntBlock) end.eval(page)) {
+          return eval(page.getPositionCount(), fieldBlock, startBlock, endBlock);
+        }
+      }
+    }
+  }
+
+  public LongBlock eval(int positionCount, LongBlock fieldBlock, IntBlock startBlock,
+      IntBlock endBlock) {
+    try(LongBlock.Builder result = driverContext.blockFactory().newLongBlockBuilder(positionCount)) {
+      position: for (int p = 0; p < positionCount; p++) {
+        boolean allBlocksAreNulls = true;
+        if (!fieldBlock.isNull(p)) {
+          allBlocksAreNulls = false;
+        }
+        if (startBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (startBlock.getValueCount(p) != 1) {
+          if (startBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (endBlock.getValueCount(p) != 1) {
+          if (endBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (allBlocksAreNulls) {
+          result.appendNull();
+          continue position;
+        }
+        try {
+          MvSlice.process(result, p, fieldBlock, startBlock.getInt(startBlock.getFirstValueIndex(p)), endBlock.getInt(endBlock.getFirstValueIndex(p)));
+        } catch (InvalidArgumentException e) {
+          warnings.registerException(e);
+          result.appendNull();
+        }
+      }
+      return result.build();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MvSliceLongEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+  }
+
+  @Override
+  public void close() {
+    Releasables.closeExpectNoException(field, start, end);
+  }
+
+  static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
+    private final Source source;
+
+    private final EvalOperator.ExpressionEvaluator.Factory field;
+
+    private final EvalOperator.ExpressionEvaluator.Factory start;
+
+    private final EvalOperator.ExpressionEvaluator.Factory end;
+
+    public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory field,
+        EvalOperator.ExpressionEvaluator.Factory start,
+        EvalOperator.ExpressionEvaluator.Factory end) {
+      this.source = source;
+      this.field = field;
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public MvSliceLongEvaluator get(DriverContext context) {
+      return new MvSliceLongEvaluator(source, field.get(context), start.get(context), end.get(context), context);
+    }
+
+    @Override
+    public String toString() {
+      return "MvSliceLongEvaluator[" + "field=" + field + ", start=" + start + ", end=" + end + "]";
+    }
+  }
+}

+ 127 - 0
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvZipEvaluator.java

@@ -0,0 +1,127 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import java.lang.IllegalArgumentException;
+import java.lang.Override;
+import java.lang.String;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.expression.function.Warnings;
+import org.elasticsearch.xpack.ql.tree.Source;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link MvZip}.
+ * This class is generated. Do not edit it.
+ */
+public final class MvZipEvaluator implements EvalOperator.ExpressionEvaluator {
+  private final Warnings warnings;
+
+  private final EvalOperator.ExpressionEvaluator leftField;
+
+  private final EvalOperator.ExpressionEvaluator rightField;
+
+  private final EvalOperator.ExpressionEvaluator delim;
+
+  private final DriverContext driverContext;
+
+  public MvZipEvaluator(Source source, EvalOperator.ExpressionEvaluator leftField,
+      EvalOperator.ExpressionEvaluator rightField, EvalOperator.ExpressionEvaluator delim,
+      DriverContext driverContext) {
+    this.warnings = new Warnings(source);
+    this.leftField = leftField;
+    this.rightField = rightField;
+    this.delim = delim;
+    this.driverContext = driverContext;
+  }
+
+  @Override
+  public Block eval(Page page) {
+    try (BytesRefBlock leftFieldBlock = (BytesRefBlock) leftField.eval(page)) {
+      try (BytesRefBlock rightFieldBlock = (BytesRefBlock) rightField.eval(page)) {
+        try (BytesRefBlock delimBlock = (BytesRefBlock) delim.eval(page)) {
+          return eval(page.getPositionCount(), leftFieldBlock, rightFieldBlock, delimBlock);
+        }
+      }
+    }
+  }
+
+  public BytesRefBlock eval(int positionCount, BytesRefBlock leftFieldBlock,
+      BytesRefBlock rightFieldBlock, BytesRefBlock delimBlock) {
+    try(BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
+      BytesRef delimScratch = new BytesRef();
+      position: for (int p = 0; p < positionCount; p++) {
+        boolean allBlocksAreNulls = true;
+        if (!leftFieldBlock.isNull(p)) {
+          allBlocksAreNulls = false;
+        }
+        if (!rightFieldBlock.isNull(p)) {
+          allBlocksAreNulls = false;
+        }
+        if (delimBlock.isNull(p)) {
+          result.appendNull();
+          continue position;
+        }
+        if (delimBlock.getValueCount(p) != 1) {
+          if (delimBlock.getValueCount(p) > 1) {
+            warnings.registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+          }
+          result.appendNull();
+          continue position;
+        }
+        if (allBlocksAreNulls) {
+          result.appendNull();
+          continue position;
+        }
+        MvZip.process(result, p, leftFieldBlock, rightFieldBlock, delimBlock.getBytesRef(delimBlock.getFirstValueIndex(p), delimScratch));
+      }
+      return result.build();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "MvZipEvaluator[" + "leftField=" + leftField + ", rightField=" + rightField + ", delim=" + delim + "]";
+  }
+
+  @Override
+  public void close() {
+    Releasables.closeExpectNoException(leftField, rightField, delim);
+  }
+
+  static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
+    private final Source source;
+
+    private final EvalOperator.ExpressionEvaluator.Factory leftField;
+
+    private final EvalOperator.ExpressionEvaluator.Factory rightField;
+
+    private final EvalOperator.ExpressionEvaluator.Factory delim;
+
+    public Factory(Source source, EvalOperator.ExpressionEvaluator.Factory leftField,
+        EvalOperator.ExpressionEvaluator.Factory rightField,
+        EvalOperator.ExpressionEvaluator.Factory delim) {
+      this.source = source;
+      this.leftField = leftField;
+      this.rightField = rightField;
+      this.delim = delim;
+    }
+
+    @Override
+    public MvZipEvaluator get(DriverContext context) {
+      return new MvZipEvaluator(source, leftField.get(context), rightField.get(context), delim.get(context), context);
+    }
+
+    @Override
+    public String toString() {
+      return "MvZipEvaluator[" + "leftField=" + leftField + ", rightField=" + rightField + ", delim=" + delim + "]";
+    }
+  }
+}

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

@@ -73,7 +73,9 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvLast
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMax;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMedian;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
+import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvSlice;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvSum;
+import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvZip;
 import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
 import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.StX;
 import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.StY;
@@ -212,6 +214,8 @@ public final class EsqlFunctionRegistry extends FunctionRegistry {
                 def(MvMax.class, MvMax::new, "mv_max"),
                 def(MvMedian.class, MvMedian::new, "mv_median"),
                 def(MvMin.class, MvMin::new, "mv_min"),
+                def(MvSlice.class, MvSlice::new, "mv_slice"),
+                def(MvZip.class, MvZip::new, "mv_zip"),
                 def(MvSum.class, MvSum::new, "mv_sum"),
                 def(Split.class, Split::new, "split") } };
     }

+ 344 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSlice.java

@@ -0,0 +1,344 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.compute.ann.Evaluator;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
+import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
+import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.planner.PlannerUtils;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
+import org.elasticsearch.xpack.ql.InvalidArgumentException;
+import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.expression.function.OptionalArgument;
+import org.elasticsearch.xpack.ql.expression.function.scalar.ScalarFunction;
+import org.elasticsearch.xpack.ql.expression.gen.script.ScriptTemplate;
+import org.elasticsearch.xpack.ql.tree.NodeInfo;
+import org.elasticsearch.xpack.ql.tree.Source;
+import org.elasticsearch.xpack.ql.type.DataType;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.FIRST;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.SECOND;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.THIRD;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isInteger;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isType;
+
+/**
+ * Returns a subset of the multivalued field using the start and end index values.
+ */
+public class MvSlice extends ScalarFunction implements OptionalArgument, EvaluatorMapper {
+    private final Expression field, start, end;
+
+    @FunctionInfo(
+        returnType = {
+            "boolean",
+            "cartesian_point",
+            "cartesian_shape",
+            "date",
+            "double",
+            "geo_point",
+            "geo_shape",
+            "integer",
+            "ip",
+            "keyword",
+            "long",
+            "text",
+            "version" },
+        description = "Returns a subset of the multivalued field using the start and end index values."
+    )
+    public MvSlice(
+        Source source,
+        @Param(
+            name = "v",
+            type = {
+                "boolean",
+                "cartesian_point",
+                "cartesian_shape",
+                "date",
+                "double",
+                "geo_point",
+                "geo_shape",
+                "integer",
+                "ip",
+                "keyword",
+                "long",
+                "text",
+                "version" },
+            description = "A multivalued field"
+        ) Expression field,
+        @Param(name = "start", type = { "integer" }, description = "start index") Expression start,
+        @Param(name = "end", type = { "integer" }, description = "end index (included)", optional = true) Expression end
+    ) {
+        super(source, end == null ? Arrays.asList(field, start, start) : Arrays.asList(field, start, end));
+        this.field = field;
+        this.start = start;
+        this.end = end == null ? start : end;
+    }
+
+    @Override
+    protected TypeResolution resolveType() {
+        if (childrenResolved() == false) {
+            return new TypeResolution("Unresolved children");
+        }
+
+        TypeResolution resolution = isType(field, EsqlDataTypes::isRepresentable, sourceText(), FIRST, "representable");
+        if (resolution.unresolved()) {
+            return resolution;
+        }
+
+        resolution = isInteger(start, sourceText(), SECOND);
+        if (resolution.unresolved()) {
+            return resolution;
+        }
+
+        if (end != null) {
+            resolution = isInteger(end, sourceText(), THIRD);
+            if (resolution.unresolved()) {
+                return resolution;
+            }
+        }
+
+        return resolution;
+    }
+
+    @Override
+    public boolean foldable() {
+        return field.foldable() && start.foldable() && (end == null || end.foldable());
+    }
+
+    @Override
+    public EvalOperator.ExpressionEvaluator.Factory toEvaluator(
+        Function<Expression, EvalOperator.ExpressionEvaluator.Factory> toEvaluator
+    ) {
+        if (start.foldable() && end.foldable()) {
+            int startOffset = Integer.parseInt(String.valueOf(start.fold()));
+            int endOffset = Integer.parseInt(String.valueOf(end.fold()));
+            checkStartEnd(startOffset, endOffset);
+        }
+        return switch (PlannerUtils.toElementType(field.dataType())) {
+            case BOOLEAN -> new MvSliceBooleanEvaluator.Factory(
+                source(),
+                toEvaluator.apply(field),
+                toEvaluator.apply(start),
+                toEvaluator.apply(end)
+            );
+            case BYTES_REF -> new MvSliceBytesRefEvaluator.Factory(
+                source(),
+                toEvaluator.apply(field),
+                toEvaluator.apply(start),
+                toEvaluator.apply(end)
+            );
+            case DOUBLE -> new MvSliceDoubleEvaluator.Factory(
+                source(),
+                toEvaluator.apply(field),
+                toEvaluator.apply(start),
+                toEvaluator.apply(end)
+            );
+            case INT -> new MvSliceIntEvaluator.Factory(
+                source(),
+                toEvaluator.apply(field),
+                toEvaluator.apply(start),
+                toEvaluator.apply(end)
+            );
+            case LONG -> new MvSliceLongEvaluator.Factory(
+                source(),
+                toEvaluator.apply(field),
+                toEvaluator.apply(start),
+                toEvaluator.apply(end)
+            );
+            case NULL -> EvalOperator.CONSTANT_NULL_FACTORY;
+            default -> throw EsqlIllegalArgumentException.illegalDataType(field.dataType());
+        };
+    }
+
+    @Override
+    public Object fold() {
+        return EvaluatorMapper.super.fold();
+    }
+
+    @Override
+    public Expression replaceChildren(List<Expression> newChildren) {
+        return new MvSlice(source(), newChildren.get(0), newChildren.get(1), newChildren.size() > 2 ? newChildren.get(2) : null);
+    }
+
+    @Override
+    protected NodeInfo<? extends Expression> info() {
+        return NodeInfo.create(this, MvSlice::new, field, start, end);
+    }
+
+    @Override
+    public DataType dataType() {
+        return field.dataType();
+    }
+
+    @Override
+    public ScriptTemplate asScript() {
+        throw new UnsupportedOperationException("functions do not support scripting");
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(field, start, end);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || obj.getClass() != getClass()) {
+            return false;
+        }
+        MvSlice other = (MvSlice) obj;
+        return Objects.equals(other.field, field) && Objects.equals(other.start, start) && Objects.equals(other.end, end);
+    }
+
+    static int adjustIndex(int oldOffset, int fieldValueCount, int first) {
+        return oldOffset < 0 ? oldOffset + fieldValueCount + first : oldOffset + first;
+    }
+
+    static void checkStartEnd(int start, int end) throws InvalidArgumentException {
+        if (start > end) {
+            throw new InvalidArgumentException("Start offset is greater than end offset");
+        }
+        if (start < 0 && end >= 0) {
+            throw new InvalidArgumentException("Start and end offset have different signs");
+        }
+    }
+
+    @Evaluator(extraName = "Boolean", warnExceptions = { InvalidArgumentException.class })
+    static void process(BooleanBlock.Builder builder, int position, BooleanBlock field, int start, int end) {
+        int fieldValueCount = field.getValueCount(position);
+        checkStartEnd(start, end);
+        int first = field.getFirstValueIndex(position);
+        int mvStartIndex = adjustIndex(start, fieldValueCount, first);
+        mvStartIndex = Math.max(first, mvStartIndex);
+        int mvEndIndex = adjustIndex(end, fieldValueCount, first);
+        mvEndIndex = Math.min(fieldValueCount + first - 1, mvEndIndex);
+        if (mvStartIndex >= fieldValueCount + first || mvEndIndex < first) {
+            builder.appendNull();
+            return;
+        }
+        if (mvStartIndex == mvEndIndex) {
+            builder.appendBoolean(field.getBoolean(mvStartIndex));
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = mvStartIndex; i <= mvEndIndex; i++) {
+            builder.appendBoolean(field.getBoolean(i));
+        }
+        builder.endPositionEntry();
+    }
+
+    @Evaluator(extraName = "Int", warnExceptions = { InvalidArgumentException.class })
+    static void process(IntBlock.Builder builder, int position, IntBlock field, int start, int end) {
+        int fieldValueCount = field.getValueCount(position);
+        checkStartEnd(start, end);
+        int first = field.getFirstValueIndex(position);
+        int mvStartIndex = adjustIndex(start, fieldValueCount, first);
+        mvStartIndex = Math.max(first, mvStartIndex);
+        int mvEndIndex = adjustIndex(end, fieldValueCount, first);
+        mvEndIndex = Math.min(fieldValueCount + first - 1, mvEndIndex);
+        if (mvStartIndex >= fieldValueCount + first || mvEndIndex < first) {
+            builder.appendNull();
+            return;
+        }
+        if (mvStartIndex == mvEndIndex) {
+            builder.appendInt(field.getInt(mvStartIndex));
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = mvStartIndex; i <= mvEndIndex; i++) {
+            builder.appendInt(field.getInt(i));
+        }
+        builder.endPositionEntry();
+    }
+
+    @Evaluator(extraName = "Long", warnExceptions = { InvalidArgumentException.class })
+    static void process(LongBlock.Builder builder, int position, LongBlock field, int start, int end) {
+        int fieldValueCount = field.getValueCount(position);
+        checkStartEnd(start, end);
+        int first = field.getFirstValueIndex(position);
+        int mvStartIndex = adjustIndex(start, fieldValueCount, first);
+        mvStartIndex = Math.max(first, mvStartIndex);
+        int mvEndIndex = adjustIndex(end, fieldValueCount, first);
+        mvEndIndex = Math.min(fieldValueCount + first - 1, mvEndIndex);
+        if (mvStartIndex >= fieldValueCount + first || mvEndIndex < first) {
+            builder.appendNull();
+            return;
+        }
+        if (mvStartIndex == mvEndIndex) {
+            builder.appendLong(field.getLong(mvStartIndex));
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = mvStartIndex; i <= mvEndIndex; i++) {
+            builder.appendLong(field.getLong(i));
+        }
+        builder.endPositionEntry();
+    }
+
+    @Evaluator(extraName = "Double", warnExceptions = { InvalidArgumentException.class })
+    static void process(DoubleBlock.Builder builder, int position, DoubleBlock field, int start, int end) {
+        int fieldValueCount = field.getValueCount(position);
+        checkStartEnd(start, end);
+        int first = field.getFirstValueIndex(position);
+        int mvStartIndex = adjustIndex(start, fieldValueCount, first);
+        mvStartIndex = Math.max(first, mvStartIndex);
+        int mvEndIndex = adjustIndex(end, fieldValueCount, first);
+        mvEndIndex = Math.min(fieldValueCount + first - 1, mvEndIndex);
+        if (mvStartIndex >= fieldValueCount + first || mvEndIndex < first) {
+            builder.appendNull();
+            return;
+        }
+        if (mvStartIndex == mvEndIndex) {
+            builder.appendDouble(field.getDouble(mvStartIndex));
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = mvStartIndex; i <= mvEndIndex; i++) {
+            builder.appendDouble(field.getDouble(i));
+        }
+        builder.endPositionEntry();
+    }
+
+    @Evaluator(extraName = "BytesRef", warnExceptions = { InvalidArgumentException.class })
+    static void process(BytesRefBlock.Builder builder, int position, BytesRefBlock field, int start, int end) {
+        int fieldValueCount = field.getValueCount(position);
+        checkStartEnd(start, end); // append null here ?
+        int first = field.getFirstValueIndex(position);
+        int mvStartIndex = adjustIndex(start, fieldValueCount, first);
+        mvStartIndex = Math.max(first, mvStartIndex);
+        int mvEndIndex = adjustIndex(end, fieldValueCount, first);
+        mvEndIndex = Math.min(fieldValueCount + first - 1, mvEndIndex);
+        if (mvStartIndex >= fieldValueCount + first || mvEndIndex < first) {
+            builder.appendNull();
+            return;
+        }
+        BytesRef fieldScratch = new BytesRef();
+        if (mvStartIndex == mvEndIndex) {
+            builder.appendBytesRef(field.getBytesRef(mvStartIndex, fieldScratch));
+            return;
+        }
+        builder.beginPositionEntry();
+        for (int i = mvStartIndex; i <= mvEndIndex; i++) {
+            builder.appendBytesRef(field.getBytesRef(i, fieldScratch));
+        }
+        builder.endPositionEntry();
+    }
+}

+ 211 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvZip.java

@@ -0,0 +1,211 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.elasticsearch.compute.ann.Evaluator;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.xpack.esql.evaluator.mapper.EvaluatorMapper;
+import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
+import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.expression.Literal;
+import org.elasticsearch.xpack.ql.expression.function.OptionalArgument;
+import org.elasticsearch.xpack.ql.expression.function.scalar.ScalarFunction;
+import org.elasticsearch.xpack.ql.expression.gen.script.ScriptTemplate;
+import org.elasticsearch.xpack.ql.tree.NodeInfo;
+import org.elasticsearch.xpack.ql.tree.Source;
+import org.elasticsearch.xpack.ql.type.DataType;
+import org.elasticsearch.xpack.ql.type.DataTypes;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.FIRST;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.SECOND;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.ParamOrdinal.THIRD;
+import static org.elasticsearch.xpack.ql.expression.TypeResolutions.isString;
+
+/**
+ * Combines the values from two multivalued fields with a delimiter that joins them together.
+ */
+public class MvZip extends ScalarFunction implements OptionalArgument, EvaluatorMapper {
+    private final Expression mvLeft, mvRight, delim;
+    private static final Literal COMMA = new Literal(Source.EMPTY, ",", DataTypes.TEXT);
+
+    @FunctionInfo(
+        returnType = { "keyword" },
+        description = "Combines the values from two multivalued fields with a delimiter that joins them together."
+    )
+    public MvZip(
+        Source source,
+        @Param(name = "mvLeft", type = { "keyword", "text" }, description = "A multivalued field") Expression mvLeft,
+        @Param(name = "mvRight", type = { "keyword", "text" }, description = "A multivalued field") Expression mvRight,
+        @Param(name = "delim", type = { "keyword", "text" }, description = "delimiter", optional = true) Expression delim
+    ) {
+        super(source, delim == null ? Arrays.asList(mvLeft, mvRight, COMMA) : Arrays.asList(mvLeft, mvRight, delim));
+        this.mvLeft = mvLeft;
+        this.mvRight = mvRight;
+        this.delim = delim == null ? COMMA : delim;
+    }
+
+    @Override
+    protected TypeResolution resolveType() {
+        if (childrenResolved() == false) {
+            return new TypeResolution("Unresolved children");
+        }
+
+        TypeResolution resolution = isString(mvLeft, sourceText(), FIRST);
+        if (resolution.unresolved()) {
+            return resolution;
+        }
+
+        resolution = isString(mvRight, sourceText(), SECOND);
+        if (resolution.unresolved()) {
+            return resolution;
+        }
+
+        if (delim != null) {
+            resolution = isString(delim, sourceText(), THIRD);
+            if (resolution.unresolved()) {
+                return resolution;
+            }
+        }
+
+        return resolution;
+    }
+
+    @Override
+    public boolean foldable() {
+        return mvLeft.foldable() && mvRight.foldable() && (delim == null || delim.foldable());
+    }
+
+    @Override
+    public EvalOperator.ExpressionEvaluator.Factory toEvaluator(
+        Function<Expression, EvalOperator.ExpressionEvaluator.Factory> toEvaluator
+    ) {
+        return new MvZipEvaluator.Factory(source(), toEvaluator.apply(mvLeft), toEvaluator.apply(mvRight), toEvaluator.apply(delim));
+    }
+
+    @Override
+    public Object fold() {
+        return EvaluatorMapper.super.fold();
+    }
+
+    @Override
+    public Expression replaceChildren(List<Expression> newChildren) {
+        return new MvZip(source(), newChildren.get(0), newChildren.get(1), newChildren.size() > 2 ? newChildren.get(2) : null);
+    }
+
+    @Override
+    protected NodeInfo<? extends Expression> info() {
+        return NodeInfo.create(this, MvZip::new, mvLeft, mvRight, delim);
+    }
+
+    @Override
+    public DataType dataType() {
+        return DataTypes.KEYWORD;
+    }
+
+    @Override
+    public ScriptTemplate asScript() {
+        throw new UnsupportedOperationException("functions do not support scripting");
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(mvLeft, mvRight, delim);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || obj.getClass() != getClass()) {
+            return false;
+        }
+        MvZip other = (MvZip) obj;
+        return Objects.equals(other.mvLeft, mvLeft) && Objects.equals(other.mvRight, mvRight) && Objects.equals(other.delim, delim);
+    }
+
+    private static void buildOneSide(BytesRefBlock.Builder builder, int start, int end, BytesRefBlock field, BytesRef fieldScratch) {
+        builder.beginPositionEntry();
+        for (int i = start; i < end; i++) {
+            builder.appendBytesRef(field.getBytesRef(i, fieldScratch));
+        }
+        builder.endPositionEntry();
+    }
+
+    @Evaluator
+    static void process(BytesRefBlock.Builder builder, int position, BytesRefBlock leftField, BytesRefBlock rightField, BytesRef delim) {
+        int leftFieldValueCount = leftField.getValueCount(position);
+        int rightFieldValueCount = rightField.getValueCount(position);
+
+        int leftFirst = leftField.getFirstValueIndex(position);
+        int rightFirst = rightField.getFirstValueIndex(position);
+
+        BytesRef fieldScratch = new BytesRef();
+
+        // nulls
+        if (leftField.isNull(position)) {
+            if (rightFieldValueCount == 1) {
+                builder.appendBytesRef(rightField.getBytesRef(rightFirst, fieldScratch));
+                return;
+            }
+            buildOneSide(builder, rightFirst, rightFirst + rightFieldValueCount, rightField, fieldScratch);
+            return;
+        }
+
+        if (rightField.isNull(position)) {
+            if (leftFieldValueCount == 1) {
+                builder.appendBytesRef(leftField.getBytesRef(leftFirst, fieldScratch));
+                return;
+            }
+            buildOneSide(builder, leftFirst, leftFirst + leftFieldValueCount, leftField, fieldScratch);
+            return;
+        }
+
+        BytesRefBuilder work = new BytesRefBuilder();
+        // single value
+        if (leftFieldValueCount == 1 && rightFieldValueCount == 1) {
+            work.append(leftField.getBytesRef(leftFirst, fieldScratch));
+            work.append(delim);
+            work.append(rightField.getBytesRef(rightFirst, fieldScratch));
+            builder.appendBytesRef(work.get());
+            return;
+        }
+        // multiple values
+        int leftIndex = 0, rightIndex = 0;
+        builder.beginPositionEntry();
+        while (leftIndex < leftFieldValueCount && rightIndex < rightFieldValueCount) {
+            // concat
+            work.clear();
+            work.append(leftField.getBytesRef(leftIndex + leftFirst, fieldScratch));
+            work.append(delim);
+            work.append(rightField.getBytesRef(rightIndex + rightFirst, fieldScratch));
+            builder.appendBytesRef(work.get());
+            leftIndex++;
+            rightIndex++;
+        }
+        while (leftIndex < leftFieldValueCount) {
+            work.clear();
+            work.append(leftField.getBytesRef(leftIndex + leftFirst, fieldScratch));
+            builder.appendBytesRef(work.get());
+            leftIndex++;
+        }
+        while (rightIndex < rightFieldValueCount) {
+            work.clear();
+            work.append(rightField.getBytesRef(rightIndex + rightFirst, fieldScratch));
+            builder.appendBytesRef(work.get());
+            rightIndex++;
+        }
+        builder.endPositionEntry();
+    }
+}

+ 30 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -97,7 +97,9 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvLast
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMax;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMedian;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
+import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvSlice;
 import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvSum;
+import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvZip;
 import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
 import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.StX;
 import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.StY;
@@ -419,7 +421,9 @@ public final class PlanNamedTypes {
             of(ScalarFunction.class, MvMax.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
             of(ScalarFunction.class, MvMedian.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
             of(ScalarFunction.class, MvMin.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
+            of(ScalarFunction.class, MvSlice.class, PlanNamedTypes::writeMvSlice, PlanNamedTypes::readMvSlice),
             of(ScalarFunction.class, MvSum.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
+            of(ScalarFunction.class, MvZip.class, PlanNamedTypes::writeMvZip, PlanNamedTypes::readMvZip),
             // Expressions (other)
             of(Expression.class, Literal.class, PlanNamedTypes::writeLiteral, PlanNamedTypes::readLiteral),
             of(Expression.class, Order.class, PlanNamedTypes::writeOrder, PlanNamedTypes::readOrder)
@@ -1831,4 +1835,30 @@ public final class PlanNamedTypes {
         out.writeExpression(fields.get(0));
         out.writeOptionalWriteable(fields.size() == 2 ? o -> out.writeExpression(fields.get(1)) : null);
     }
+
+    static MvSlice readMvSlice(PlanStreamInput in) throws IOException {
+        return new MvSlice(in.readSource(), in.readExpression(), in.readExpression(), in.readOptionalNamed(Expression.class));
+    }
+
+    static void writeMvSlice(PlanStreamOutput out, MvSlice fn) throws IOException {
+        out.writeNoSource();
+        List<Expression> fields = fn.children();
+        assert fields.size() == 2 || fields.size() == 3;
+        out.writeExpression(fields.get(0));
+        out.writeExpression(fields.get(1));
+        out.writeOptionalWriteable(fields.size() == 3 ? o -> out.writeExpression(fields.get(2)) : null);
+    }
+
+    static MvZip readMvZip(PlanStreamInput in) throws IOException {
+        return new MvZip(in.readSource(), in.readExpression(), in.readExpression(), in.readOptionalNamed(Expression.class));
+    }
+
+    static void writeMvZip(PlanStreamOutput out, MvZip fn) throws IOException {
+        out.writeNoSource();
+        List<Expression> fields = fn.children();
+        assert fields.size() == 2 || fields.size() == 3;
+        out.writeExpression(fields.get(0));
+        out.writeExpression(fields.get(1));
+        out.writeOptionalWriteable(fields.size() == 3 ? o -> out.writeExpression(fields.get(2)) : null);
+    }
 }

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java

@@ -446,7 +446,7 @@ public abstract class AbstractFunctionTestCase extends ESTestCase {
 
     // TODO cranky time
 
-    public final void testSimpleWithNulls() { // TODO replace this with nulls inserted into the test case like anyNullIsNull
+    public void testSimpleWithNulls() { // TODO replace this with nulls inserted into the test case like anyNullIsNull
         assumeTrue("nothing to do if a type error", testCase.getExpectedTypeError() == null);
         assumeTrue("All test data types must be representable in order to build fields", testCase.allTypesAreRepresentable());
         List<Object> simpleData = testCase.getDataValues();

+ 346 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvSliceTests.java

@@ -0,0 +1,346 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.geo.GeometryTestUtils;
+import org.elasticsearch.geo.ShapeTestUtils;
+import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+import org.elasticsearch.xpack.esql.expression.function.scalar.AbstractScalarFunctionTestCase;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
+import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.tree.Source;
+import org.elasticsearch.xpack.ql.type.DataType;
+import org.elasticsearch.xpack.ql.type.DataTypes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.CARTESIAN;
+import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.GEO;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+public class MvSliceTests extends AbstractScalarFunctionTestCase {
+    public MvSliceTests(@Name("TestCase") Supplier<TestCaseSupplier.TestCase> testCaseSupplier) {
+        this.testCase = testCaseSupplier.get();
+    }
+
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() {
+        List<TestCaseSupplier> suppliers = new ArrayList<>();
+        booleans(suppliers);
+        ints(suppliers);
+        longs(suppliers);
+        doubles(suppliers);
+        bytesRefs(suppliers);
+        return parameterSuppliersFromTypedData(suppliers);
+    }
+
+    @Override
+    protected DataType expectedType(List<DataType> argTypes) {
+        return argTypes.get(0);
+    }
+
+    @Override
+    protected List<ArgumentSpec> argSpec() {
+        return List.of(required(representableTypes()), required(integers()), optional(integers()));
+    }
+
+    @Override
+    protected Expression build(Source source, List<Expression> args) {
+        return new MvSlice(source, args.get(0), args.get(1), args.size() > 2 ? args.get(2) : null);
+    }
+
+    private static void booleans(List<TestCaseSupplier> suppliers) {
+        // Positive
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.BOOLEAN, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Boolean> field = randomList(1, 10, () -> randomBoolean());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.BOOLEAN, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBooleanEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.BOOLEAN,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+        // Positive Start IndexOutofBound
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.BOOLEAN, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Boolean> field = randomList(1, 10, () -> randomBoolean());
+            int length = field.size();
+            int start = randomIntBetween(length, length + 1);
+            int end = randomIntBetween(start, length + 10);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.BOOLEAN, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBooleanEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.BOOLEAN,
+                nullValue()
+            );
+        }));
+        // Positive End IndexOutofBound
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.BOOLEAN, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Boolean> field = randomList(1, 10, () -> randomBoolean());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(length, length + 10);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.BOOLEAN, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBooleanEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.BOOLEAN,
+                equalTo(start == length - 1 ? field.get(start) : field.subList(start, length))
+            );
+        }));
+        // Negative
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.BOOLEAN, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Boolean> field = randomList(1, 10, () -> randomBoolean());
+            int length = field.size();
+            int start = randomIntBetween(0 - length, -1);
+            int end = randomIntBetween(start, -1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.BOOLEAN, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBooleanEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.BOOLEAN,
+                equalTo(start == end ? field.get(start + length) : field.subList(start + length, end + 1 + length))
+            );
+        }));
+    }
+
+    private static void ints(List<TestCaseSupplier> suppliers) {
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.INTEGER, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Integer> field = randomList(1, 10, () -> randomInt());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.INTEGER, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceIntEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.INTEGER,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+    }
+
+    private static void longs(List<TestCaseSupplier> suppliers) {
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.LONG, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Long> field = randomList(1, 10, () -> randomLong());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.LONG, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceLongEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.LONG,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.DATETIME, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Long> field = randomList(1, 10, () -> randomLong());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.DATETIME, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceLongEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.DATETIME,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+    }
+
+    private static void doubles(List<TestCaseSupplier> suppliers) {
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.DOUBLE, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Double> field = randomList(1, 10, () -> randomDouble());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.DOUBLE, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceDoubleEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.DOUBLE,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+    }
+
+    private static void bytesRefs(List<TestCaseSupplier> suppliers) {
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.KEYWORD, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> randomLiteral(DataTypes.KEYWORD).value());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.KEYWORD, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.KEYWORD,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.TEXT, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> randomLiteral(DataTypes.TEXT).value());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.TEXT, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.TEXT,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.IP, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> randomLiteral(DataTypes.IP).value());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.IP, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.IP,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.VERSION, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> randomLiteral(DataTypes.VERSION).value());
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, DataTypes.VERSION, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                DataTypes.VERSION,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(EsqlDataTypes.GEO_POINT, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> new BytesRef(GEO.asWkt(GeometryTestUtils.randomPoint())));
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, EsqlDataTypes.GEO_POINT, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                EsqlDataTypes.GEO_POINT,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(EsqlDataTypes.CARTESIAN_POINT, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> new BytesRef(CARTESIAN.asWkt(ShapeTestUtils.randomPoint())));
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, EsqlDataTypes.CARTESIAN_POINT, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                EsqlDataTypes.CARTESIAN_POINT,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(EsqlDataTypes.GEO_SHAPE, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> new BytesRef(GEO.asWkt(GeometryTestUtils.randomGeometry(randomBoolean()))));
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, EsqlDataTypes.GEO_SHAPE, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                EsqlDataTypes.GEO_SHAPE,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(EsqlDataTypes.CARTESIAN_SHAPE, DataTypes.INTEGER, DataTypes.INTEGER), () -> {
+            List<Object> field = randomList(1, 10, () -> new BytesRef(CARTESIAN.asWkt(ShapeTestUtils.randomGeometry(randomBoolean()))));
+            int length = field.size();
+            int start = randomIntBetween(0, length - 1);
+            int end = randomIntBetween(start, length - 1);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(field, EsqlDataTypes.CARTESIAN_SHAPE, "field"),
+                    new TestCaseSupplier.TypedData(start, DataTypes.INTEGER, "start"),
+                    new TestCaseSupplier.TypedData(end, DataTypes.INTEGER, "end")
+                ),
+                "MvSliceBytesRefEvaluator[field=Attribute[channel=0], start=Attribute[channel=1], end=Attribute[channel=2]]",
+                EsqlDataTypes.CARTESIAN_SHAPE,
+                equalTo(start == end ? field.get(start) : field.subList(start, end + 1))
+            );
+        }));
+    }
+}

+ 120 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/multivalue/MvZipTests.java

@@ -0,0 +1,120 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.multivalue;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
+import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+import org.elasticsearch.xpack.esql.expression.function.scalar.AbstractScalarFunctionTestCase;
+import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.tree.Source;
+import org.elasticsearch.xpack.ql.type.DataType;
+import org.elasticsearch.xpack.ql.type.DataTypes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static java.lang.Math.max;
+import static org.hamcrest.Matchers.equalTo;
+
+public class MvZipTests extends AbstractScalarFunctionTestCase {
+    public MvZipTests(@Name("TestCase") Supplier<TestCaseSupplier.TestCase> testCaseSupplier) {
+        this.testCase = testCaseSupplier.get();
+    }
+
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() {
+        List<TestCaseSupplier> suppliers = new ArrayList<>();
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.KEYWORD, DataTypes.KEYWORD, DataTypes.KEYWORD), () -> {
+            List<Object> left = randomList(1, 3, () -> randomLiteral(DataTypes.KEYWORD).value());
+            List<Object> right = randomList(1, 3, () -> randomLiteral(DataTypes.KEYWORD).value());
+            String delim = randomAlphaOfLengthBetween(1, 1);
+            List<BytesRef> expected = calculateExpected(left, right, delim);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(left, DataTypes.KEYWORD, "mvLeft"),
+                    new TestCaseSupplier.TypedData(right, DataTypes.KEYWORD, "mvRight"),
+                    new TestCaseSupplier.TypedData(delim, DataTypes.KEYWORD, "delim")
+                ),
+                "MvZipEvaluator[leftField=Attribute[channel=0], rightField=Attribute[channel=1], delim=Attribute[channel=2]]",
+                DataTypes.KEYWORD,
+                equalTo(expected.size() == 1 ? expected.iterator().next() : expected)
+            );
+        }));
+
+        suppliers.add(new TestCaseSupplier(List.of(DataTypes.TEXT, DataTypes.TEXT, DataTypes.TEXT), () -> {
+            List<Object> left = randomList(1, 10, () -> randomLiteral(DataTypes.TEXT).value());
+            List<Object> right = randomList(1, 10, () -> randomLiteral(DataTypes.TEXT).value());
+            String delim = randomAlphaOfLengthBetween(1, 1);
+            List<BytesRef> expected = calculateExpected(left, right, delim);
+            return new TestCaseSupplier.TestCase(
+                List.of(
+                    new TestCaseSupplier.TypedData(left, DataTypes.TEXT, "mvLeft"),
+                    new TestCaseSupplier.TypedData(right, DataTypes.TEXT, "mvRight"),
+                    new TestCaseSupplier.TypedData(delim, DataTypes.TEXT, "delim")
+                ),
+                "MvZipEvaluator[leftField=Attribute[channel=0], rightField=Attribute[channel=1], delim=Attribute[channel=2]]",
+                DataTypes.KEYWORD,
+                equalTo(expected.size() == 1 ? expected.iterator().next() : expected)
+            );
+        }));
+
+        return parameterSuppliersFromTypedData(suppliers);
+    }
+
+    @Override
+    protected DataType expectedType(List<DataType> argTypes) {
+        return DataTypes.KEYWORD;
+    }
+
+    @Override
+    protected List<ArgumentSpec> argSpec() {
+        return List.of(required(strings()), required(strings()), optional(strings()));
+    }
+
+    @Override
+    protected Expression build(Source source, List<Expression> args) {
+        return new MvZip(source, args.get(0), args.get(1), args.size() > 2 ? args.get(2) : null);
+    }
+
+    private static List<BytesRef> calculateExpected(List<Object> left, List<Object> right, String delim) {
+        List<BytesRef> expected = new ArrayList<>(max(left.size(), right.size()));
+        int i = 0, j = 0;
+        while (i < left.size() && j < right.size()) {
+            BytesRefBuilder work = new BytesRefBuilder();
+            work.append((BytesRef) left.get(i));
+            work.append(new BytesRef(delim));
+            work.append((BytesRef) right.get(j));
+            expected.add(work.get());
+            i++;
+            j++;
+        }
+        while (i < left.size()) {
+            BytesRefBuilder work = new BytesRefBuilder();
+            work.append((BytesRef) left.get(i));
+            expected.add(work.get());
+            i++;
+        }
+        while (j < right.size()) {
+            BytesRefBuilder work = new BytesRefBuilder();
+            work.append((BytesRef) right.get(j));
+            expected.add(work.get());
+            j++;
+        }
+        return expected;
+    }
+
+    @Override
+    public void testSimpleWithNulls() {
+        assumeFalse("mv_zip returns null only if both left and right inputs are nulls", false);
+    }
+}