|  | @@ -7,7 +7,6 @@
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  package org.elasticsearch.xpack.esql.io.stream;
 |  |  package org.elasticsearch.xpack.esql.io.stream;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -import org.apache.lucene.util.BytesRef;
 |  | 
 | 
											
												
													
														|  |  import org.elasticsearch.TransportVersion;
 |  |  import org.elasticsearch.TransportVersion;
 | 
											
												
													
														|  |  import org.elasticsearch.TransportVersions;
 |  |  import org.elasticsearch.TransportVersions;
 | 
											
												
													
														|  |  import org.elasticsearch.common.TriFunction;
 |  |  import org.elasticsearch.common.TriFunction;
 | 
											
										
											
												
													
														|  | @@ -46,7 +45,6 @@ import org.elasticsearch.xpack.esql.core.plan.logical.Limit;
 | 
											
												
													
														|  |  import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
 |  |  import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
 | 
											
												
													
														|  |  import org.elasticsearch.xpack.esql.core.plan.logical.OrderBy;
 |  |  import org.elasticsearch.xpack.esql.core.plan.logical.OrderBy;
 | 
											
												
													
														|  |  import org.elasticsearch.xpack.esql.core.tree.Source;
 |  |  import org.elasticsearch.xpack.esql.core.tree.Source;
 | 
											
												
													
														|  | -import org.elasticsearch.xpack.esql.core.type.DataType;
 |  | 
 | 
											
												
													
														|  |  import org.elasticsearch.xpack.esql.core.type.EsField;
 |  |  import org.elasticsearch.xpack.esql.core.type.EsField;
 | 
											
												
													
														|  |  import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
 |  |  import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
 | 
											
												
													
														|  |  import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
 |  |  import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
 | 
											
										
											
												
													
														|  | @@ -178,10 +176,6 @@ import java.util.function.BiFunction;
 | 
											
												
													
														|  |  import java.util.function.Function;
 |  |  import java.util.function.Function;
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |  import static java.util.Map.entry;
 |  |  import static java.util.Map.entry;
 | 
											
												
													
														|  | -import static org.elasticsearch.xpack.esql.core.type.DataType.CARTESIAN_POINT;
 |  | 
 | 
											
												
													
														|  | -import static org.elasticsearch.xpack.esql.core.type.DataType.GEO_POINT;
 |  | 
 | 
											
												
													
														|  | -import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.CARTESIAN;
 |  | 
 | 
											
												
													
														|  | -import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO;
 |  | 
 | 
											
												
													
														|  |  import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.Entry.of;
 |  |  import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.Entry.of;
 | 
											
												
													
														|  |  import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader.readerFromPlanReader;
 |  |  import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader.readerFromPlanReader;
 | 
											
												
													
														|  |  import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter.writerFromPlanWriter;
 |  |  import static org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter.writerFromPlanWriter;
 | 
											
										
											
												
													
														|  | @@ -363,10 +357,7 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |              of(ScalarFunction.class, MvSort.class, PlanNamedTypes::writeMvSort, PlanNamedTypes::readMvSort),
 |  |              of(ScalarFunction.class, MvSort.class, PlanNamedTypes::writeMvSort, PlanNamedTypes::readMvSort),
 | 
											
												
													
														|  |              of(ScalarFunction.class, MvSlice.class, PlanNamedTypes::writeMvSlice, PlanNamedTypes::readMvSlice),
 |  |              of(ScalarFunction.class, MvSlice.class, PlanNamedTypes::writeMvSlice, PlanNamedTypes::readMvSlice),
 | 
											
												
													
														|  |              of(ScalarFunction.class, MvSum.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
 |  |              of(ScalarFunction.class, MvSum.class, PlanNamedTypes::writeMvFunction, PlanNamedTypes::readMvFunction),
 | 
											
												
													
														|  | -            of(ScalarFunction.class, MvZip.class, PlanNamedTypes::writeMvZip, PlanNamedTypes::readMvZip),
 |  | 
 | 
											
												
													
														|  | -            // Expressions (other)
 |  | 
 | 
											
												
													
														|  | -            of(Expression.class, Literal.class, PlanNamedTypes::writeLiteral, PlanNamedTypes::readLiteral),
 |  | 
 | 
											
												
													
														|  | -            of(Expression.class, Order.class, PlanNamedTypes::writeOrder, PlanNamedTypes::readOrder)
 |  | 
 | 
											
												
													
														|  | 
 |  | +            of(ScalarFunction.class, MvZip.class, PlanNamedTypes::writeMvZip, PlanNamedTypes::readMvZip)
 | 
											
												
													
														|  |          );
 |  |          );
 | 
											
												
													
														|  |          List<PlanNameRegistry.Entry> entries = new ArrayList<>(declared);
 |  |          List<PlanNameRegistry.Entry> entries = new ArrayList<>(declared);
 | 
											
												
													
														|  |  
 |  |  
 | 
											
										
											
												
													
														|  | @@ -378,6 +369,8 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |              entries.add(of(Expression.class, e));
 |  |              entries.add(of(Expression.class, e));
 | 
											
												
													
														|  |          }
 |  |          }
 | 
											
												
													
														|  |          entries.add(of(Expression.class, UnsupportedAttribute.ENTRY));
 |  |          entries.add(of(Expression.class, UnsupportedAttribute.ENTRY));
 | 
											
												
													
														|  | 
 |  | +        entries.add(of(Expression.class, Literal.ENTRY));
 | 
											
												
													
														|  | 
 |  | +        entries.add(of(Expression.class, org.elasticsearch.xpack.esql.expression.Order.ENTRY));
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          return entries;
 |  |          return entries;
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
										
											
												
													
														|  | @@ -678,14 +671,14 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |          return new OrderExec(
 |  |          return new OrderExec(
 | 
											
												
													
														|  |              Source.readFrom(in),
 |  |              Source.readFrom(in),
 | 
											
												
													
														|  |              in.readPhysicalPlanNode(),
 |  |              in.readPhysicalPlanNode(),
 | 
											
												
													
														|  | -            in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder))
 |  | 
 | 
											
												
													
														|  | 
 |  | +            in.readCollectionAsList(org.elasticsearch.xpack.esql.expression.Order::new)
 | 
											
												
													
														|  |          );
 |  |          );
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      static void writeOrderExec(PlanStreamOutput out, OrderExec orderExec) throws IOException {
 |  |      static void writeOrderExec(PlanStreamOutput out, OrderExec orderExec) throws IOException {
 | 
											
												
													
														|  |          Source.EMPTY.writeTo(out);
 |  |          Source.EMPTY.writeTo(out);
 | 
											
												
													
														|  |          out.writePhysicalPlanNode(orderExec.child());
 |  |          out.writePhysicalPlanNode(orderExec.child());
 | 
											
												
													
														|  | -        out.writeCollection(orderExec.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
 |  | 
 | 
											
												
													
														|  | 
 |  | +        out.writeCollection(orderExec.order());
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      static ProjectExec readProjectExec(PlanStreamInput in) throws IOException {
 |  |      static ProjectExec readProjectExec(PlanStreamInput in) throws IOException {
 | 
											
										
											
												
													
														|  | @@ -731,7 +724,7 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |          return new TopNExec(
 |  |          return new TopNExec(
 | 
											
												
													
														|  |              Source.readFrom(in),
 |  |              Source.readFrom(in),
 | 
											
												
													
														|  |              in.readPhysicalPlanNode(),
 |  |              in.readPhysicalPlanNode(),
 | 
											
												
													
														|  | -            in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder)),
 |  | 
 | 
											
												
													
														|  | 
 |  | +            in.readCollectionAsList(org.elasticsearch.xpack.esql.expression.Order::new),
 | 
											
												
													
														|  |              in.readNamed(Expression.class),
 |  |              in.readNamed(Expression.class),
 | 
											
												
													
														|  |              in.readOptionalVInt()
 |  |              in.readOptionalVInt()
 | 
											
												
													
														|  |          );
 |  |          );
 | 
											
										
											
												
													
														|  | @@ -740,7 +733,7 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |      static void writeTopNExec(PlanStreamOutput out, TopNExec topNExec) throws IOException {
 |  |      static void writeTopNExec(PlanStreamOutput out, TopNExec topNExec) throws IOException {
 | 
											
												
													
														|  |          Source.EMPTY.writeTo(out);
 |  |          Source.EMPTY.writeTo(out);
 | 
											
												
													
														|  |          out.writePhysicalPlanNode(topNExec.child());
 |  |          out.writePhysicalPlanNode(topNExec.child());
 | 
											
												
													
														|  | -        out.writeCollection(topNExec.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
 |  | 
 | 
											
												
													
														|  | 
 |  | +        out.writeCollection(topNExec.order());
 | 
											
												
													
														|  |          out.writeExpression(topNExec.limit());
 |  |          out.writeExpression(topNExec.limit());
 | 
											
												
													
														|  |          out.writeOptionalVInt(topNExec.estimatedRowSize());
 |  |          out.writeOptionalVInt(topNExec.estimatedRowSize());
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
										
											
												
													
														|  | @@ -969,14 +962,14 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |          return new OrderBy(
 |  |          return new OrderBy(
 | 
											
												
													
														|  |              Source.readFrom(in),
 |  |              Source.readFrom(in),
 | 
											
												
													
														|  |              in.readLogicalPlanNode(),
 |  |              in.readLogicalPlanNode(),
 | 
											
												
													
														|  | -            in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder))
 |  | 
 | 
											
												
													
														|  | 
 |  | +            in.readCollectionAsList(org.elasticsearch.xpack.esql.expression.Order::new)
 | 
											
												
													
														|  |          );
 |  |          );
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      static void writeOrderBy(PlanStreamOutput out, OrderBy order) throws IOException {
 |  |      static void writeOrderBy(PlanStreamOutput out, OrderBy order) throws IOException {
 | 
											
												
													
														|  |          Source.EMPTY.writeTo(out);
 |  |          Source.EMPTY.writeTo(out);
 | 
											
												
													
														|  |          out.writeLogicalPlanNode(order.child());
 |  |          out.writeLogicalPlanNode(order.child());
 | 
											
												
													
														|  | -        out.writeCollection(order.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
 |  | 
 | 
											
												
													
														|  | 
 |  | +        out.writeCollection(order.order());
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      static Project readProject(PlanStreamInput in) throws IOException {
 |  |      static Project readProject(PlanStreamInput in) throws IOException {
 | 
											
										
											
												
													
														|  | @@ -993,15 +986,15 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |          return new TopN(
 |  |          return new TopN(
 | 
											
												
													
														|  |              Source.readFrom(in),
 |  |              Source.readFrom(in),
 | 
											
												
													
														|  |              in.readLogicalPlanNode(),
 |  |              in.readLogicalPlanNode(),
 | 
											
												
													
														|  | -            in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder)),
 |  | 
 | 
											
												
													
														|  | -            in.readNamed(Expression.class)
 |  | 
 | 
											
												
													
														|  | 
 |  | +            in.readCollectionAsList(org.elasticsearch.xpack.esql.expression.Order::new),
 | 
											
												
													
														|  | 
 |  | +            in.readExpression()
 | 
											
												
													
														|  |          );
 |  |          );
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      static void writeTopN(PlanStreamOutput out, TopN topN) throws IOException {
 |  |      static void writeTopN(PlanStreamOutput out, TopN topN) throws IOException {
 | 
											
												
													
														|  |          Source.EMPTY.writeTo(out);
 |  |          Source.EMPTY.writeTo(out);
 | 
											
												
													
														|  |          out.writeLogicalPlanNode(topN.child());
 |  |          out.writeLogicalPlanNode(topN.child());
 | 
											
												
													
														|  | -        out.writeCollection(topN.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
 |  | 
 | 
											
												
													
														|  | 
 |  | +        out.writeCollection(topN.order());
 | 
											
												
													
														|  |          out.writeExpression(topN.limit());
 |  |          out.writeExpression(topN.limit());
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
										
											
												
													
														|  | @@ -1552,83 +1545,6 @@ public final class PlanNamedTypes {
 | 
											
												
													
														|  |          out.writeExpression(fn.right());
 |  |          out.writeExpression(fn.right());
 | 
											
												
													
														|  |      }
 |  |      }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -    // -- Expressions (other)
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    static Literal readLiteral(PlanStreamInput in) throws IOException {
 |  | 
 | 
											
												
													
														|  | -        Source source = Source.readFrom(in);
 |  | 
 | 
											
												
													
														|  | -        Object value = in.readGenericValue();
 |  | 
 | 
											
												
													
														|  | -        DataType dataType = DataType.readFrom(in);
 |  | 
 | 
											
												
													
														|  | -        return new Literal(source, mapToLiteralValue(in, dataType, value), dataType);
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    static void writeLiteral(PlanStreamOutput out, Literal literal) throws IOException {
 |  | 
 | 
											
												
													
														|  | -        Source.EMPTY.writeTo(out);
 |  | 
 | 
											
												
													
														|  | -        out.writeGenericValue(mapFromLiteralValue(out, literal.dataType(), literal.value()));
 |  | 
 | 
											
												
													
														|  | -        out.writeString(literal.dataType().typeName());
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    /**
 |  | 
 | 
											
												
													
														|  | -     * Not all literal values are currently supported in StreamInput/StreamOutput as generic values.
 |  | 
 | 
											
												
													
														|  | -     * This mapper allows for addition of new and interesting values without (yet) adding to StreamInput/Output.
 |  | 
 | 
											
												
													
														|  | -     * This makes the most sense during the pre-GA version of ESQL. When we get near GA we might want to push this down.
 |  | 
 | 
											
												
													
														|  | -     * <p>
 |  | 
 | 
											
												
													
														|  | -     * For the spatial point type support we need to care about the fact that 8.12.0 uses encoded longs for serializing
 |  | 
 | 
											
												
													
														|  | -     * while 8.13 uses WKB.
 |  | 
 | 
											
												
													
														|  | -     */
 |  | 
 | 
											
												
													
														|  | -    private static Object mapFromLiteralValue(PlanStreamOutput out, DataType dataType, Object value) {
 |  | 
 | 
											
												
													
														|  | -        if (dataType == GEO_POINT || dataType == CARTESIAN_POINT) {
 |  | 
 | 
											
												
													
														|  | -            // In 8.12.0 we serialized point literals as encoded longs, but now use WKB
 |  | 
 | 
											
												
													
														|  | -            if (out.getTransportVersion().before(TransportVersions.V_8_13_0)) {
 |  | 
 | 
											
												
													
														|  | -                if (value instanceof List<?> list) {
 |  | 
 | 
											
												
													
														|  | -                    return list.stream().map(v -> mapFromLiteralValue(out, dataType, v)).toList();
 |  | 
 | 
											
												
													
														|  | -                }
 |  | 
 | 
											
												
													
														|  | -                return wkbAsLong(dataType, (BytesRef) value);
 |  | 
 | 
											
												
													
														|  | -            }
 |  | 
 | 
											
												
													
														|  | -        }
 |  | 
 | 
											
												
													
														|  | -        return value;
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    /**
 |  | 
 | 
											
												
													
														|  | -     * Not all literal values are currently supported in StreamInput/StreamOutput as generic values.
 |  | 
 | 
											
												
													
														|  | -     * This mapper allows for addition of new and interesting values without (yet) changing StreamInput/Output.
 |  | 
 | 
											
												
													
														|  | -     */
 |  | 
 | 
											
												
													
														|  | -    private static Object mapToLiteralValue(PlanStreamInput in, DataType dataType, Object value) {
 |  | 
 | 
											
												
													
														|  | -        if (dataType == GEO_POINT || dataType == CARTESIAN_POINT) {
 |  | 
 | 
											
												
													
														|  | -            // In 8.12.0 we serialized point literals as encoded longs, but now use WKB
 |  | 
 | 
											
												
													
														|  | -            if (in.getTransportVersion().before(TransportVersions.V_8_13_0)) {
 |  | 
 | 
											
												
													
														|  | -                if (value instanceof List<?> list) {
 |  | 
 | 
											
												
													
														|  | -                    return list.stream().map(v -> mapToLiteralValue(in, dataType, v)).toList();
 |  | 
 | 
											
												
													
														|  | -                }
 |  | 
 | 
											
												
													
														|  | -                return longAsWKB(dataType, (Long) value);
 |  | 
 | 
											
												
													
														|  | -            }
 |  | 
 | 
											
												
													
														|  | -        }
 |  | 
 | 
											
												
													
														|  | -        return value;
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    private static BytesRef longAsWKB(DataType dataType, long encoded) {
 |  | 
 | 
											
												
													
														|  | -        return dataType == GEO_POINT ? GEO.longAsWkb(encoded) : CARTESIAN.longAsWkb(encoded);
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    private static long wkbAsLong(DataType dataType, BytesRef wkb) {
 |  | 
 | 
											
												
													
														|  | -        return dataType == GEO_POINT ? GEO.wkbAsLong(wkb) : CARTESIAN.wkbAsLong(wkb);
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    static Order readOrder(PlanStreamInput in) throws IOException {
 |  | 
 | 
											
												
													
														|  | -        return new org.elasticsearch.xpack.esql.expression.Order(
 |  | 
 | 
											
												
													
														|  | -            Source.readFrom(in),
 |  | 
 | 
											
												
													
														|  | -            in.readNamed(Expression.class),
 |  | 
 | 
											
												
													
														|  | -            in.readEnum(Order.OrderDirection.class),
 |  | 
 | 
											
												
													
														|  | -            in.readEnum(Order.NullsPosition.class)
 |  | 
 | 
											
												
													
														|  | -        );
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  | -    static void writeOrder(PlanStreamOutput out, Order order) throws IOException {
 |  | 
 | 
											
												
													
														|  | -        Source.EMPTY.writeTo(out);
 |  | 
 | 
											
												
													
														|  | -        out.writeExpression(order.child());
 |  | 
 | 
											
												
													
														|  | -        out.writeEnum(order.direction());
 |  | 
 | 
											
												
													
														|  | -        out.writeEnum(order.nullsPosition());
 |  | 
 | 
											
												
													
														|  | -    }
 |  | 
 | 
											
												
													
														|  | -
 |  | 
 | 
											
												
													
														|  |      // -- ancillary supporting classes of plan nodes, etc
 |  |      // -- ancillary supporting classes of plan nodes, etc
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      static EsQueryExec.FieldSort readFieldSort(PlanStreamInput in) throws IOException {
 |  |      static EsQueryExec.FieldSort readFieldSort(PlanStreamInput in) throws IOException {
 |