Browse Source

ESQL: Add type to layout (#99327)

We want it in a few places.
Nik Everett 2 years ago
parent
commit
443c53c636

+ 2 - 3
benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/EvalBenchmark.java

@@ -44,6 +44,7 @@ import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.Warmup;
 
 import java.time.Duration;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -133,9 +134,7 @@ public class EvalBenchmark {
 
     private static Layout layout(FieldAttribute... fields) {
         Layout.Builder layout = new Layout.Builder();
-        for (FieldAttribute field : fields) {
-            layout.appendChannel(field.id());
-        }
+        layout.append(Arrays.asList(fields));
         return layout.build();
     }
 

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/evaluator/EvalMapper.java

@@ -161,7 +161,7 @@ public final class EvalMapper {
                     return page.getBlock(channel);
                 }
             }
-            int channel = layout.getChannel(attr.id());
+            int channel = layout.get(attr.id()).channel();
             return () -> new Attribute(channel);
         }
     }

+ 12 - 12
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java

@@ -58,9 +58,9 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
 
             // append channels to the layout
             if (mode == AggregateExec.Mode.FINAL) {
-                layout.appendChannels(aggregates);
+                layout.append(aggregates);
             } else {
-                layout.appendChannels(aggregateMapper.mapNonGrouping(aggregates));
+                layout.append(aggregateMapper.mapNonGrouping(aggregates));
             }
             // create the agg factories
             aggregatesToFactory(
@@ -87,8 +87,8 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
                 if (groupAttribute == null) {
                     throw new EsqlIllegalArgumentException("Unexpected non-named expression[{}] as grouping in [{}]", group, aggregateExec);
                 }
-                Set<NameId> grpAttribIds = new HashSet<>();
-                grpAttribIds.add(groupAttribute.id());
+                Layout.ChannelSet groupAttributeLayout = new Layout.ChannelSet(new HashSet<>(), groupAttribute.dataType());
+                groupAttributeLayout.nameIds().add(groupAttribute.id());
 
                 /*
                  * Check for aliasing in aggregates which occurs in two cases (due to combining project + stats):
@@ -99,10 +99,9 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
                     if (agg instanceof Alias a) {
                         if (a.child() instanceof Attribute attr) {
                             if (groupAttribute.id().equals(attr.id())) {
-                                grpAttribIds.add(a.id());
+                                groupAttributeLayout.nameIds().add(a.id());
                                 // TODO: investigate whether a break could be used since it shouldn't be possible to have multiple
-                                // attributes
-                                // pointing to the same attribute
+                                // attributes pointing to the same attribute
                             }
                             // partial mode only
                             // check if there's any alias used in grouping - no need for the final reduction since the intermediate data
@@ -117,18 +116,19 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
                         }
                     }
                 }
-                layout.appendChannel(grpAttribIds);
-                groupSpecs.add(new GroupSpec(source.layout.getChannel(groupAttribute.id()), groupAttribute));
+                layout.append(groupAttributeLayout);
+                Layout.ChannelAndType groupInput = source.layout.get(groupAttribute.id());
+                groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), groupAttribute));
             }
 
             if (mode == AggregateExec.Mode.FINAL) {
                 for (var agg : aggregates) {
                     if (agg instanceof Alias alias && alias.child() instanceof AggregateFunction) {
-                        layout.appendChannel(alias.id());
+                        layout.append(alias);
                     }
                 }
             } else {
-                layout.appendChannels(aggregateMapper.mapGrouping(aggregates));
+                layout.append(aggregateMapper.mapGrouping(aggregates));
             }
 
             // create the agg factories
@@ -252,7 +252,7 @@ abstract class AbstractPhysicalOperationProviders implements PhysicalOperationPr
                         params[i] = aggParams.get(i).fold();
                     }
 
-                    List<Integer> inputChannels = sourceAttr.stream().map(NamedExpression::id).map(layout::getChannel).toList();
+                    List<Integer> inputChannels = sourceAttr.stream().map(attr -> layout.get(attr.id()).channel()).toList();
                     assert inputChannels != null && inputChannels.size() > 0 && inputChannels.stream().allMatch(i -> i >= 0);
                     if (aggregateFunction instanceof ToAggregator agg) {
                         consumer.accept(new AggFunctionSupplierContext(agg.supplier(bigArrays, inputChannels), aggMode));

+ 62 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/DefaultLayout.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.planner;
+
+import org.elasticsearch.xpack.ql.expression.NameId;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+class DefaultLayout implements Layout {
+    private final Map<NameId, ChannelAndType> layout;
+    private final int numberOfChannels;
+
+    DefaultLayout(Map<NameId, ChannelAndType> layout, int numberOfChannels) {
+        this.layout = layout;
+        this.numberOfChannels = numberOfChannels;
+    }
+
+    @Override
+    public ChannelAndType get(NameId id) {
+        return layout.get(id);
+    }
+
+    /**
+     * @return the total number of channels in the layout.
+     */
+    @Override
+    public int numberOfChannels() {
+        return numberOfChannels;
+    }
+
+    @Override
+    public Map<Integer, Set<NameId>> inverse() {
+        Map<Integer, Set<NameId>> inverse = new HashMap<>();
+        for (Map.Entry<NameId, ChannelAndType> entry : layout.entrySet()) {
+            NameId key = entry.getKey();
+            Integer value = entry.getValue().channel();
+            inverse.computeIfAbsent(value, k -> new HashSet<>()).add(key);
+        }
+        return inverse;
+    }
+
+    /**
+     * @return creates a builder to append to this layout.
+     */
+    @Override
+    public Layout.Builder builder() {
+        return new Builder(numberOfChannels, layout);
+    }
+
+    @Override
+    public String toString() {
+        return "Layout{" + "layout=" + layout + ", numberOfChannels=" + numberOfChannels + '}';
+    }
+}

+ 4 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

@@ -61,7 +61,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
 
         PhysicalOperation op = source;
         for (Attribute attr : fieldExtractExec.attributesToExtract()) {
-            layout.appendChannel(attr.id());
+            layout.append(attr);
             Layout previousLayout = op.layout;
 
             var sources = ValueSources.sources(
@@ -71,7 +71,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
                 LocalExecutionPlanner.toElementType(attr.dataType())
             );
 
-            int docChannel = previousLayout.getChannel(sourceAttr.id());
+            int docChannel = previousLayout.get(sourceAttr.id()).channel();
 
             op = op.with(
                 new ValuesSourceReaderOperator.ValuesSourceReaderOperatorFactory(sources, docChannel, attr.name()),
@@ -137,9 +137,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
             );
         }
         Layout.Builder layout = new Layout.Builder();
-        for (int i = 0; i < esQueryExec.output().size(); i++) {
-            layout.appendChannel(esQueryExec.output().get(i).id());
-        }
+        layout.append(esQueryExec.output());
         int instanceCount = Math.max(1, luceneFactory.taskConcurrency());
         context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount));
         return PhysicalOperation.fromSource(luceneFactory, layout.build());
@@ -155,7 +153,7 @@ public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProvi
         LocalExecutionPlannerContext context
     ) {
         var sourceAttribute = FieldExtractExec.extractSourceAttributesFrom(aggregateExec.child());
-        int docChannel = source.layout.getChannel(sourceAttribute.id());
+        int docChannel = source.layout.get(sourceAttribute.id()).channel();
         // The grouping-by values are ready, let's group on them directly.
         // Costin: why are they ready and not already exposed in the layout?
         return new OrdinalsGroupingOperator.OrdinalsGroupingOperatorFactory(

+ 23 - 24
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/ExchangeLayout.java

@@ -7,43 +7,32 @@
 
 package org.elasticsearch.xpack.esql.planner;
 
-import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.xpack.ql.expression.NameId;
 
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import static java.util.Collections.emptyMap;
-
 /**
  * Decorating layout that creates the NameId -> Value lazily based on the calls made to its content.
  * Essentially it maps the existing (old) NameIds to the new ones.
  */
-class ExchangeLayout extends Layout {
-
-    private final Map<NameId, Integer> delegate;
+class ExchangeLayout implements Layout {
+    private final Layout delegate;
     private final Map<Integer, Set<NameId>> inverse;
     private final Map<NameId, NameId> mappingToOldLayout;
     private int counter;
 
-    ExchangeLayout(Layout layout) {
-        super(emptyMap(), 0);
-        this.delegate = layout.internalLayout();
-        this.mappingToOldLayout = Maps.newMapWithExpectedSize(delegate.size());
-        this.inverse = Maps.newMapWithExpectedSize(delegate.size());
-
-        for (Map.Entry<NameId, Integer> entry : delegate.entrySet()) {
-            NameId key = entry.getKey();
-            Integer value = entry.getValue();
-            inverse.computeIfAbsent(value, k -> new HashSet<>()).add(key);
-        }
+    ExchangeLayout(Layout delegate) {
+        this.delegate = delegate;
+        this.inverse = delegate.inverse();
+        this.mappingToOldLayout = new HashMap<>(inverse.size());
     }
 
     @Override
-    public Integer getChannel(NameId id) {
+    public ChannelAndType get(NameId id) {
         var oldId = mappingToOldLayout.get(id);
-        if (oldId == null && counter < delegate.size()) {
+        if (oldId == null && counter < inverse.size()) {
             var names = inverse.get(counter++);
             for (var name : names) {
                 oldId = name;
@@ -54,12 +43,22 @@ class ExchangeLayout extends Layout {
     }
 
     @Override
-    public int numberOfIds() {
-        return delegate.size();
+    public int numberOfChannels() {
+        return delegate.numberOfChannels();
     }
 
     @Override
-    public int numberOfChannels() {
-        return inverse.size();
+    public String toString() {
+        return "ExchangeLayout{" + delegate + '}';
+    }
+
+    @Override
+    public Builder builder() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<Integer, Set<NameId>> inverse() {
+        throw new UnsupportedOperationException();
     }
 }

+ 55 - 64
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/Layout.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.planner;
 
 import org.elasticsearch.xpack.ql.expression.NameId;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
+import org.elasticsearch.xpack.ql.type.DataType;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -18,119 +19,109 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * Maintains the mapping from attribute ids to channels (block index).
  *
  * An attribute can only be mapped to exactly one channel but one channel can be mapped to multiple attributes.
  */
-public class Layout {
-
-    private final Map<NameId, Integer> layout;
-    private final int numberOfChannels;
-
-    Layout(Map<NameId, Integer> layout, int numberOfChannels) {
-        this.layout = layout;
-        this.numberOfChannels = numberOfChannels;
-    }
+public interface Layout {
+    /**
+     * The values stored in the {@link Layout}, a channel id and a {@link DataType}.
+     */
+    record ChannelAndType(int channel, DataType type) {}
 
     /**
-     * @param id the attribute id
-     * @return the channel to which the specific attribute id is mapped or `null` if the attribute id does not exist in the layout.
+     * A part of an "inverse" layout, a {@link Set} or {@link NameId}s and a {@link DataType}.
      */
-    public Integer getChannel(NameId id) {
-        return layout.get(id);
-    }
+    record ChannelSet(Set<NameId> nameIds, DataType type) {}
 
     /**
-     * @return the total number of ids in the layout.
+     * @param id the attribute id
+     * @return the channel to which the specific attribute id is mapped or `null` if the attribute id does not exist in the layout.
      */
-    public int numberOfIds() {
-        return layout.size();
-    }
+    ChannelAndType get(NameId id);
 
     /**
      * @return the total number of channels in the layout.
      */
-    public int numberOfChannels() {
-        return numberOfChannels;
-    }
-
-    Map<NameId, Integer> internalLayout() {
-        return layout;
-    }
+    int numberOfChannels();
 
     /**
      * @return creates a builder to append to this layout.
      */
-    public Layout.Builder builder() {
-        return new Layout.Builder(this);
-    }
+    Layout.Builder builder();
 
-    @Override
-    public String toString() {
-        return "BlockLayout{" + "layout=" + layout + ", numberOfChannels=" + numberOfChannels + '}';
-    }
+    Map<Integer, Set<NameId>> inverse();
 
     /**
      * Builder class for Layout. The builder ensures that layouts cannot be altered after creation (through references to the underlying
      * map).
      */
-    public static class Builder {
+    class Builder {
+        private final List<ChannelSet> channels = new ArrayList<>();
 
-        private final List<Set<NameId>> channels;
+        public Builder() {}
 
-        public Builder() {
-            this.channels = new ArrayList<>();
-        }
-
-        private Builder(Layout layout) {
-            channels = IntStream.range(0, layout.numberOfChannels).<Set<NameId>>mapToObj(i -> new HashSet<>()).collect(Collectors.toList());
-            for (Map.Entry<NameId, Integer> entry : layout.layout.entrySet()) {
-                channels.get(entry.getValue()).add(entry.getKey());
+        Builder(int numberOfChannels, Map<NameId, ChannelAndType> layout) {
+            for (int i = 0; i < numberOfChannels; i++) {
+                channels.add(null);
+            }
+            for (Map.Entry<NameId, ChannelAndType> entry : layout.entrySet()) {
+                ChannelSet set = channels.get(entry.getValue().channel);
+                if (set == null) {
+                    set = new ChannelSet(new HashSet<>(), entry.getValue().type());
+                    channels.set(entry.getValue().channel, set);
+                } else {
+                    if (set.type != entry.getValue().type()) {
+                        throw new IllegalArgumentException();
+                    }
+                }
+                set.nameIds.add(entry.getKey());
             }
         }
 
         /**
-         * Appends a new channel to the layout. The channel is mapped to a single attribute id.
-         * @param id the attribute id
+         * Appends a new channel to the layout. The channel is mapped to one or more attribute ids.
          */
-        public Builder appendChannel(NameId id) {
-            channels.add(Set.of(id));
+        public Builder append(ChannelSet set) {
+            if (set.nameIds.size() < 1) {
+                throw new IllegalArgumentException("Channel must be mapped to at least one id.");
+            }
+            channels.add(set);
             return this;
         }
 
         /**
-         * Appends a new channel to the layout. The channel is mapped to one or more attribute ids.
-         * @param ids the attribute ids
+         * Appends a new channel to the layout. The channel is mapped to a single attribute id.
          */
-        public Builder appendChannel(Set<NameId> ids) {
-            if (ids.size() < 1) {
-                throw new IllegalArgumentException("Channel must be mapped to at least one id.");
-            }
-            channels.add(ids);
-            return this;
+        public Builder append(NamedExpression attribute) {
+            return append(new ChannelSet(Set.of(attribute.id()), attribute.dataType()));
         }
 
-        public Builder appendChannels(Collection<? extends NamedExpression> attributes) {
-            for (var attribute : attributes) {
-                appendChannel(attribute.id());
+        /**
+         * Appends many new channels to the layout. Each channel is mapped to a single attribute id.
+         */
+        public Builder append(Collection<? extends NamedExpression> attributes) {
+            for (NamedExpression attribute : attributes) {
+                append(new ChannelSet(Set.of(attribute.id()), attribute.dataType()));
             }
             return this;
         }
 
+        /**
+         * Build a new {@link Layout}.
+         */
         public Layout build() {
-            Map<NameId, Integer> layout = new HashMap<>();
+            Map<NameId, ChannelAndType> layout = new HashMap<>();
             int numberOfChannels = 0;
-            for (Set<NameId> ids : this.channels) {
+            for (ChannelSet set : channels) {
                 int channel = numberOfChannels++;
-                for (NameId id : ids) {
-                    layout.putIfAbsent(id, channel);
+                for (NameId id : set.nameIds) {
+                    layout.putIfAbsent(id, new ChannelAndType(channel, set.type));
                 }
             }
-            return new Layout(Collections.unmodifiableMap(layout), numberOfChannels);
+            return new DefaultLayout(Collections.unmodifiableMap(layout), numberOfChannels);
         }
     }
 }

+ 25 - 37
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

@@ -281,7 +281,7 @@ public class LocalExecutionPlanner {
         int index = -1;
         boolean transformRequired = false;
         for (var attribute : attrs) {
-            mappedPosition[++index] = layout.getChannel(attribute.id());
+            mappedPosition[++index] = layout.get(attribute.id()).channel();
             transformRequired |= mappedPosition[index] != index;
         }
         Function<Page, Page> transformer = transformRequired ? p -> {
@@ -314,9 +314,7 @@ public class LocalExecutionPlanner {
         Objects.requireNonNull(exchangeSourceHandler, "ExchangeSourceHandler wasn't provided");
 
         var builder = new Layout.Builder();
-        for (var attr : exchangeSource.output()) {
-            builder.appendChannel(attr.id());
-        }
+        builder.append(exchangeSource.output());
         // decorate the layout
         var l = builder.build();
         var layout = exchangeSource.isIntermediateAgg() ? new ExchangeLayout(l) : l;
@@ -330,7 +328,7 @@ public class LocalExecutionPlanner {
         List<TopNOperator.SortOrder> orders = topNExec.order().stream().map(order -> {
             int sortByChannel;
             if (order.child() instanceof Attribute a) {
-                sortByChannel = source.layout.getChannel(a.id());
+                sortByChannel = source.layout.get(a.id()).channel();
             } else {
                 throw new EsqlIllegalArgumentException("order by expression must be an attribute");
             }
@@ -386,7 +384,7 @@ public class LocalExecutionPlanner {
             Supplier<ExpressionEvaluator> evaluatorSupplier;
             evaluatorSupplier = EvalMapper.toEvaluator(field.child(), source.layout);
             Layout.Builder layout = source.layout.builder();
-            layout.appendChannel(field.toAttribute().id());
+            layout.append(field.toAttribute());
             source = source.with(new EvalOperatorFactory(evaluatorSupplier), layout.build());
         }
         return source;
@@ -395,9 +393,7 @@ public class LocalExecutionPlanner {
     private PhysicalOperation planDissect(DissectExec dissect, LocalExecutionPlannerContext context) {
         PhysicalOperation source = plan(dissect.child(), context);
         Layout.Builder layoutBuilder = source.layout.builder();
-        for (Attribute attr : dissect.extractedFields()) {
-            layoutBuilder.appendChannel(attr.id());
-        }
+        layoutBuilder.append(dissect.extractedFields());
         final Expression expr = dissect.inputExpression();
         String[] attributeNames = Expressions.names(dissect.extractedFields()).toArray(new String[0]);
 
@@ -417,10 +413,7 @@ public class LocalExecutionPlanner {
         PhysicalOperation source = plan(grok.child(), context);
         Layout.Builder layoutBuilder = source.layout.builder();
         List<Attribute> extractedFields = grok.extractedFields();
-        for (Attribute attr : extractedFields) {
-            layoutBuilder.appendChannel(attr.id());
-        }
-
+        layoutBuilder.append(extractedFields);
         Map<String, Integer> fieldToPos = new HashMap<>(extractedFields.size());
         Map<String, ElementType> fieldToType = new HashMap<>(extractedFields.size());
         ElementType[] types = new ElementType[extractedFields.size()];
@@ -447,10 +440,7 @@ public class LocalExecutionPlanner {
     private PhysicalOperation planEnrich(EnrichExec enrich, LocalExecutionPlannerContext context) {
         PhysicalOperation source = plan(enrich.child(), context);
         Layout.Builder layoutBuilder = source.layout.builder();
-        List<NamedExpression> extractedFields = enrich.enrichFields();
-        for (NamedExpression attr : extractedFields) {
-            layoutBuilder.appendChannel(attr.id());
-        }
+        layoutBuilder.append(enrich.enrichFields());
         Layout layout = layoutBuilder.build();
         Set<String> indices = enrich.enrichIndex().concreteIndices();
         if (indices.size() != 1) {
@@ -462,7 +452,7 @@ public class LocalExecutionPlanner {
                 sessionId,
                 parentTask,
                 1, // TODO: Add a concurrent setting for enrich - also support unordered mode
-                source.layout.getChannel(enrich.matchField().id()),
+                source.layout.get(enrich.matchField().id()).channel(),
                 enrichLookupService,
                 enrichIndex,
                 "match", // TODO: enrich should also resolve the match_type
@@ -480,20 +470,13 @@ public class LocalExecutionPlanner {
     private PhysicalOperation planRow(RowExec row, LocalExecutionPlannerContext context) {
         List<Object> obj = row.fields().stream().map(f -> f.child().fold()).toList();
         Layout.Builder layout = new Layout.Builder();
-        var output = row.output();
-        for (Attribute attribute : output) {
-            layout.appendChannel(attribute.id());
-        }
+        layout.append(row.output());
         return PhysicalOperation.fromSource(new RowOperatorFactory(obj), layout.build());
     }
 
     private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) {
-
         Layout.Builder layout = new Layout.Builder();
-        var output = localSourceExec.output();
-        for (Attribute attribute : output) {
-            layout.appendChannel(attribute.id());
-        }
+        layout.append(localSourceExec.output());
         LocalSourceOperator.BlockSupplier supplier = () -> localSourceExec.supplier().get();
         var operator = new LocalSourceOperator(supplier);
         return PhysicalOperation.fromSource(new LocalSourceFactory(() -> operator), layout.build());
@@ -501,16 +484,14 @@ public class LocalExecutionPlanner {
 
     private PhysicalOperation planShow(ShowExec showExec) {
         Layout.Builder layout = new Layout.Builder();
-        for (var attribute : showExec.output()) {
-            layout.appendChannel(attribute.id());
-        }
+        layout.append(showExec.output());
         return PhysicalOperation.fromSource(new ShowOperator.ShowOperatorFactory(showExec.values()), layout.build());
     }
 
     private PhysicalOperation planProject(ProjectExec project, LocalExecutionPlannerContext context) {
         var source = plan(project.child(), context);
 
-        Map<Integer, Set<NameId>> inputChannelToOutputIds = new HashMap<>();
+        Map<Integer, Layout.ChannelSet> inputChannelToOutputIds = new HashMap<>();
         for (NamedExpression ne : project.projections()) {
             NameId inputId;
             if (ne instanceof Alias a) {
@@ -518,19 +499,26 @@ public class LocalExecutionPlanner {
             } else {
                 inputId = ne.id();
             }
-            int inputChannel = source.layout.getChannel(inputId);
-            inputChannelToOutputIds.computeIfAbsent(inputChannel, ignore -> new HashSet<>()).add(ne.id());
+            Layout.ChannelAndType input = source.layout.get(inputId);
+            Layout.ChannelSet channelSet = inputChannelToOutputIds.computeIfAbsent(
+                input.channel(),
+                ignore -> new Layout.ChannelSet(new HashSet<>(), input.type())
+            );
+            if (channelSet.type() != input.type()) {
+                throw new IllegalArgumentException("type mismatch for aliases");
+            }
+            channelSet.nameIds().add(ne.id());
         }
 
         BitSet mask = new BitSet();
         Layout.Builder layout = new Layout.Builder();
 
         for (int inChannel = 0; inChannel < source.layout.numberOfChannels(); inChannel++) {
-            Set<NameId> outputIds = inputChannelToOutputIds.get(inChannel);
+            Layout.ChannelSet outputSet = inputChannelToOutputIds.get(inChannel);
 
-            if (outputIds != null) {
+            if (outputSet != null) {
                 mask.set(inChannel);
-                layout.appendChannel(outputIds);
+                layout.append(outputSet);
             }
         }
 
@@ -555,7 +543,7 @@ public class LocalExecutionPlanner {
 
     private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) {
         PhysicalOperation source = plan(mvExpandExec.child(), context);
-        return source.with(new MvExpandOperator.Factory(source.layout.getChannel(mvExpandExec.target().id())), source.layout);
+        return source.with(new MvExpandOperator.Factory(source.layout.get(mvExpandExec.target().id()).channel()), source.layout);
     }
 
     /**

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractFunctionTestCase.java

@@ -149,7 +149,7 @@ public abstract class AbstractFunctionTestCase extends ESTestCase {
      */
     protected void buildLayout(Layout.Builder builder, Expression e) {
         if (e instanceof FieldAttribute f) {
-            builder.appendChannel(f.id());
+            builder.append(f);
             return;
         }
         for (Expression c : e.children()) {

+ 4 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java

@@ -121,10 +121,10 @@ public class EvalMapperTests extends ESTestCase {
 
     public void testEvaluatorSuppliers() {
         Layout.Builder lb = new Layout.Builder();
-        lb.appendChannel(DOUBLE1.id());
-        lb.appendChannel(DOUBLE2.id());
-        lb.appendChannel(DATE.id());
-        lb.appendChannel(LONG.id());
+        lb.append(DOUBLE1);
+        lb.append(DOUBLE2);
+        lb.append(DATE);
+        lb.append(LONG);
         Layout layout = lb.build();
 
         Supplier<EvalOperator.ExpressionEvaluator> supplier = EvalMapper.toEvaluator(expression, layout);

+ 2 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java

@@ -52,7 +52,7 @@ public class TestPhysicalOperationProviders extends AbstractPhysicalOperationPro
         Layout.Builder layout = source.layout.builder();
         PhysicalOperation op = source;
         for (Attribute attr : fieldExtractExec.attributesToExtract()) {
-            layout.appendChannel(attr.id());
+            layout.append(attr);
             op = op.with(new TestFieldExtractOperatorFactory(attr.name()), layout.build());
         }
         return op;
@@ -61,9 +61,7 @@ public class TestPhysicalOperationProviders extends AbstractPhysicalOperationPro
     @Override
     public PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, LocalExecutionPlannerContext context) {
         Layout.Builder layout = new Layout.Builder();
-        for (int i = 0; i < esQueryExec.output().size(); i++) {
-            layout.appendChannel(esQueryExec.output().get(i).id());
-        }
+        layout.append(esQueryExec.output());
         return PhysicalOperation.fromSource(new TestSourceOperatorFactory(), layout.build());
     }