浏览代码

ES|QL: improve serialization of FieldAttributes (#111447)

Attributes are shared multiple times in query plans. In addition, multiple FieldAttributes can share the same parent FieldAttribute.

Send each Attribute instance only once; the second time, send a serialization ID, that will be used to cache and retrieve the instance.
Luigi Dell'Aquila 1 年之前
父节点
当前提交
c4235d56cf
共有 14 个文件被更改,包括 394 次插入46 次删除
  1. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  2. 19 12
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java
  3. 17 10
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/MetadataAttribute.java
  4. 16 9
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/ReferenceAttribute.java
  5. 11 0
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamInput.java
  6. 24 0
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamOutput.java
  7. 16 9
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttribute.java
  8. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  9. 49 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
  10. 58 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
  11. 3 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAttributeTestCase.java
  12. 4 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MetadataAttributeTests.java
  13. 4 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttributeTests.java
  14. 171 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutputTests.java

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -182,6 +182,7 @@ public class TransportVersions {
     public static final TransportVersion ML_ADD_DETECTION_RULE_PARAMS = def(8_712_00_0);
     public static final TransportVersion FIX_VECTOR_SIMILARITY_INNER_HITS = def(8_713_00_0);
     public static final TransportVersion INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN = def(8_714_00_0);
+    public static final TransportVersion ESQL_ATTRIBUTE_CACHED_SERIALIZATION = def(8_715_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 19 - 12
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java

@@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
+import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 import org.elasticsearch.xpack.esql.core.util.StringUtils;
 
 import java.io.IOException;
@@ -36,7 +37,7 @@ public class FieldAttribute extends TypedAttribute {
     static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         Attribute.class,
         "FieldAttribute",
-        FieldAttribute::new
+        FieldAttribute::readFrom
     );
 
     private final FieldAttribute parent;
@@ -81,7 +82,7 @@ public class FieldAttribute extends TypedAttribute {
         this.field = field;
     }
 
-    public FieldAttribute(StreamInput in) throws IOException {
+    private FieldAttribute(StreamInput in) throws IOException {
         /*
          * The funny casting dance with `(StreamInput & PlanStreamInput) in` is required
          * because we're in esql-core here and the real PlanStreamInput is in
@@ -92,7 +93,7 @@ public class FieldAttribute extends TypedAttribute {
          */
         this(
             Source.readFrom((StreamInput & PlanStreamInput) in),
-            in.readOptionalWriteable(FieldAttribute::new),
+            in.readOptionalWriteable(FieldAttribute::readFrom),
             in.readString(),
             DataType.readFrom(in),
             in.readNamedWriteable(EsField.class),
@@ -105,15 +106,21 @@ public class FieldAttribute extends TypedAttribute {
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        Source.EMPTY.writeTo(out);
-        out.writeOptionalWriteable(parent);
-        out.writeString(name());
-        dataType().writeTo(out);
-        out.writeNamedWriteable(field);
-        out.writeOptionalString(qualifier());
-        out.writeEnum(nullable());
-        id().writeTo(out);
-        out.writeBoolean(synthetic());
+        if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
+            Source.EMPTY.writeTo(out);
+            out.writeOptionalWriteable(parent);
+            out.writeString(name());
+            dataType().writeTo(out);
+            out.writeNamedWriteable(field);
+            out.writeOptionalString(qualifier());
+            out.writeEnum(nullable());
+            id().writeTo(out);
+            out.writeBoolean(synthetic());
+        }
+    }
+
+    public static FieldAttribute readFrom(StreamInput in) throws IOException {
+        return ((PlanStreamInput) in).readAttributeWithCache(FieldAttribute::new);
     }
 
     @Override

+ 17 - 10
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/MetadataAttribute.java

@@ -19,6 +19,7 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
+import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.util.Map;
@@ -33,7 +34,7 @@ public class MetadataAttribute extends TypedAttribute {
     static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         Attribute.class,
         "MetadataAttribute",
-        MetadataAttribute::new
+        MetadataAttribute::readFrom
     );
 
     private static final Map<String, Tuple<DataType, Boolean>> ATTRIBUTES_MAP = Map.of(
@@ -72,7 +73,7 @@ public class MetadataAttribute extends TypedAttribute {
     }
 
     @SuppressWarnings("unchecked")
-    public MetadataAttribute(StreamInput in) throws IOException {
+    private MetadataAttribute(StreamInput in) throws IOException {
         /*
          * The funny casting dance with `(StreamInput & PlanStreamInput) in` is required
          * because we're in esql-core here and the real PlanStreamInput is in
@@ -95,14 +96,20 @@ public class MetadataAttribute extends TypedAttribute {
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        Source.EMPTY.writeTo(out);
-        out.writeString(name());
-        dataType().writeTo(out);
-        out.writeOptionalString(qualifier());
-        out.writeEnum(nullable());
-        id().writeTo(out);
-        out.writeBoolean(synthetic());
-        out.writeBoolean(searchable);
+        if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
+            Source.EMPTY.writeTo(out);
+            out.writeString(name());
+            dataType().writeTo(out);
+            out.writeOptionalString(qualifier());
+            out.writeEnum(nullable());
+            id().writeTo(out);
+            out.writeBoolean(synthetic());
+            out.writeBoolean(searchable);
+        }
+    }
+
+    public static MetadataAttribute readFrom(StreamInput in) throws IOException {
+        return ((PlanStreamInput) in).readAttributeWithCache(MetadataAttribute::new);
     }
 
     @Override

+ 16 - 9
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/ReferenceAttribute.java

@@ -13,6 +13,7 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
+import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 
@@ -23,7 +24,7 @@ public class ReferenceAttribute extends TypedAttribute {
     static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         Attribute.class,
         "ReferenceAttribute",
-        ReferenceAttribute::new
+        ReferenceAttribute::readFrom
     );
 
     public ReferenceAttribute(Source source, String name, DataType dataType) {
@@ -43,7 +44,7 @@ public class ReferenceAttribute extends TypedAttribute {
     }
 
     @SuppressWarnings("unchecked")
-    public ReferenceAttribute(StreamInput in) throws IOException {
+    private ReferenceAttribute(StreamInput in) throws IOException {
         /*
          * The funny casting dance with `(StreamInput & PlanStreamInput) in` is required
          * because we're in esql-core here and the real PlanStreamInput is in
@@ -65,13 +66,19 @@ public class ReferenceAttribute extends TypedAttribute {
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        Source.EMPTY.writeTo(out);
-        out.writeString(name());
-        dataType().writeTo(out);
-        out.writeOptionalString(qualifier());
-        out.writeEnum(nullable());
-        id().writeTo(out);
-        out.writeBoolean(synthetic());
+        if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
+            Source.EMPTY.writeTo(out);
+            out.writeString(name());
+            dataType().writeTo(out);
+            out.writeOptionalString(qualifier());
+            out.writeEnum(nullable());
+            id().writeTo(out);
+            out.writeBoolean(synthetic());
+        }
+    }
+
+    public static ReferenceAttribute readFrom(StreamInput in) throws IOException {
+        return ((PlanStreamInput) in).readAttributeWithCache(ReferenceAttribute::new);
     }
 
     @Override

+ 11 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamInput.java

@@ -7,6 +7,9 @@
 
 package org.elasticsearch.xpack.esql.core.util;
 
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.core.CheckedFunction;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.NameId;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 
@@ -33,4 +36,12 @@ public interface PlanStreamInput {
      * the same result.
      */
     NameId mapNameId(long id) throws IOException;
+
+    /**
+     * Reads an Attribute using the attribute cache.
+     * @param constructor the constructor needed to build the actual attribute when read from the wire
+     * @return An attribute; this will generally be the same type as the provided constructor
+     * @throws IOException
+     */
+    <A extends Attribute> A readAttributeWithCache(CheckedFunction<StreamInput, A, IOException> constructor) throws IOException;
 }

+ 24 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamOutput.java

@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.core.util;
+
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+
+import java.io.IOException;
+
+public interface PlanStreamOutput {
+
+    /**
+     * Writes a cache header for an {@link Attribute} and caches it if it is not already in the cache.
+     * In that case, the attribute will have to serialize itself into this stream immediately after this method call.
+     * @param attribute The attribute to serialize
+     * @return true if the attribute needs to serialize itself, false otherwise (ie. if already cached)
+     * @throws IOException
+     */
+    boolean writeAttributeCacheHeader(Attribute attribute) throws IOException;
+}

+ 16 - 9
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttribute.java

@@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.UnsupportedEsField;
+import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 
 import java.io.IOException;
@@ -36,17 +37,17 @@ public final class UnsupportedAttribute extends FieldAttribute implements Unreso
     public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
         Attribute.class,
         "UnsupportedAttribute",
-        UnsupportedAttribute::new
+        UnsupportedAttribute::readFrom
     );
     public static final NamedWriteableRegistry.Entry NAMED_EXPRESSION_ENTRY = new NamedWriteableRegistry.Entry(
         NamedExpression.class,
         ENTRY.name,
-        UnsupportedAttribute::new
+        UnsupportedAttribute::readFrom
     );
     public static final NamedWriteableRegistry.Entry EXPRESSION_ENTRY = new NamedWriteableRegistry.Entry(
         Expression.class,
         ENTRY.name,
-        UnsupportedAttribute::new
+        UnsupportedAttribute::readFrom
     );
 
     private final String message;
@@ -70,7 +71,7 @@ public final class UnsupportedAttribute extends FieldAttribute implements Unreso
         this.message = customMessage == null ? errorMessage(qualifiedName(), field) : customMessage;
     }
 
-    public UnsupportedAttribute(StreamInput in) throws IOException {
+    private UnsupportedAttribute(StreamInput in) throws IOException {
         this(
             Source.readFrom((PlanStreamInput) in),
             in.readString(),
@@ -82,11 +83,17 @@ public final class UnsupportedAttribute extends FieldAttribute implements Unreso
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        Source.EMPTY.writeTo(out);
-        out.writeString(name());
-        field().writeTo(out);
-        out.writeOptionalString(hasCustomMessage ? message : null);
-        id().writeTo(out);
+        if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
+            Source.EMPTY.writeTo(out);
+            out.writeString(name());
+            field().writeTo(out);
+            out.writeOptionalString(hasCustomMessage ? message : null);
+            id().writeTo(out);
+        }
+    }
+
+    public static UnsupportedAttribute readFrom(StreamInput in) throws IOException {
+        return ((PlanStreamInput) in).readAttributeWithCache(UnsupportedAttribute::new);
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -752,7 +752,7 @@ public final class PlanNamedTypes {
 
     static EsQueryExec.FieldSort readFieldSort(PlanStreamInput in) throws IOException {
         return new EsQueryExec.FieldSort(
-            new FieldAttribute(in),
+            FieldAttribute.readFrom(in),
             in.readEnum(Order.OrderDirection.class),
             in.readEnum(Order.NullsPosition.class)
         );

+ 49 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.xpack.esql.io.stream;
 
+import org.apache.lucene.util.ArrayUtil;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
 import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
@@ -21,8 +23,10 @@ import org.elasticsearch.compute.data.BooleanBigArrayBlock;
 import org.elasticsearch.compute.data.DoubleBigArrayBlock;
 import org.elasticsearch.compute.data.IntBigArrayBlock;
 import org.elasticsearch.compute.data.LongBigArrayBlock;
+import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.xpack.esql.Column;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.NameId;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanNamedReader;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader;
@@ -60,6 +64,8 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput
 
     private final Map<Integer, Block> cachedBlocks = new HashMap<>();
 
+    private Attribute[] attributesCache = new Attribute[64];
+
     private final PlanNameRegistry registry;
 
     // hook for nameId, where can cache and map, for now just return a NameId of the same long value.
@@ -206,4 +212,47 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput
     public NameId mapNameId(long l) {
         return nameIdFunction.apply(l);
     }
+
+    /**
+     * @param constructor the constructor needed to build the actual attribute when read from the wire
+     * @throws IOException
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public <A extends Attribute> A readAttributeWithCache(CheckedFunction<StreamInput, A, IOException> constructor) throws IOException {
+        if (getTransportVersion().onOrAfter(TransportVersions.ESQL_ATTRIBUTE_CACHED_SERIALIZATION)) {
+            // it's safe to cast to int, since the max value for this is {@link PlanStreamOutput#MAX_SERIALIZED_ATTRIBUTES}
+            int cacheId = Math.toIntExact(readZLong());
+            if (cacheId < 0) {
+                cacheId = -1 - cacheId;
+                Attribute result = constructor.apply(this);
+                cacheAttribute(cacheId, result);
+                return (A) result;
+            } else {
+                return (A) attributeFromCache(cacheId);
+            }
+        } else {
+            return constructor.apply(this);
+        }
+    }
+
+    private Attribute attributeFromCache(int id) throws IOException {
+        if (attributesCache[id] == null) {
+            throw new IOException("Attribute ID not found in serialization cache [" + id + "]");
+        }
+        return attributesCache[id];
+    }
+
+    /**
+     * Add and attribute to the cache, based on the serialization ID generated by {@link PlanStreamOutput}
+     * @param id The ID that will reference the attribute. Generated  at serialization time
+     * @param attr The attribute to cache
+     */
+    private void cacheAttribute(int id, Attribute attr) {
+        assert id >= 0;
+        if (id >= attributesCache.length) {
+            attributesCache = ArrayUtil.grow(attributesCache);
+        }
+        attributesCache[id] = attr;
+    }
 }

+ 58 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.esql.io.stream;
 
 import org.elasticsearch.TransportVersion;
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -19,6 +20,8 @@ import org.elasticsearch.compute.data.IntBigArrayBlock;
 import org.elasticsearch.compute.data.LongBigArrayBlock;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.Column;
+import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.plan.logical.join.Join;
@@ -34,7 +37,14 @@ import java.util.function.Function;
  * A customized stream output used to serialize ESQL physical plan fragments. Complements stream
  * output with methods that write plan nodes, Attributes, Expressions, etc.
  */
-public final class PlanStreamOutput extends StreamOutput {
+public final class PlanStreamOutput extends StreamOutput implements org.elasticsearch.xpack.esql.core.util.PlanStreamOutput {
+
+    /**
+     * max number of attributes that can be cached for serialization
+     * <p>
+     * TODO should this be a cluster setting...?
+     */
+    protected static final int MAX_SERIALIZED_ATTRIBUTES = 1_000_000;
 
     /**
      * Cache of written blocks. We use an {@link IdentityHashMap} for this
@@ -44,6 +54,16 @@ public final class PlanStreamOutput extends StreamOutput {
      */
     private final Map<Block, BytesReference> cachedBlocks = new IdentityHashMap<>();
 
+    /**
+     * Cache for field attributes.
+     * Field attributes can be a significant part of the query execution plan, especially
+     * for queries like `from *`, that can have thousands of output columns.
+     * Attributes can be shared by many plan nodes (eg. ExcahngeSink output, Project output, EsRelation fields);
+     * in addition, multiple Attributes can share the same parent field.
+     * This cache allows to send each attribute only once; from the second occurrence, only an id will be sent
+     */
+    protected final Map<Attribute, Integer> cachedAttributes = new IdentityHashMap<>();
+
     private final StreamOutput delegate;
     private final PlanNameRegistry registry;
 
@@ -51,16 +71,19 @@ public final class PlanStreamOutput extends StreamOutput {
 
     private int nextCachedBlock = 0;
 
+    private int maxSerializedAttributes;
+
     public PlanStreamOutput(StreamOutput delegate, PlanNameRegistry registry, @Nullable EsqlConfiguration configuration)
         throws IOException {
-        this(delegate, registry, configuration, PlanNamedTypes::name);
+        this(delegate, registry, configuration, PlanNamedTypes::name, MAX_SERIALIZED_ATTRIBUTES);
     }
 
     public PlanStreamOutput(
         StreamOutput delegate,
         PlanNameRegistry registry,
         @Nullable EsqlConfiguration configuration,
-        Function<Class<?>, String> nameSupplier
+        Function<Class<?>, String> nameSupplier,
+        int maxSerializedAttributes
     ) throws IOException {
         this.delegate = delegate;
         this.registry = registry;
@@ -72,6 +95,7 @@ public final class PlanStreamOutput extends StreamOutput {
                 }
             }
         }
+        this.maxSerializedAttributes = maxSerializedAttributes;
     }
 
     public void writeLogicalPlanNode(LogicalPlan logicalPlan) throws IOException {
@@ -158,6 +182,37 @@ public final class PlanStreamOutput extends StreamOutput {
         nextCachedBlock++;
     }
 
+    @Override
+    public boolean writeAttributeCacheHeader(Attribute attribute) throws IOException {
+        if (getTransportVersion().onOrAfter(TransportVersions.ESQL_ATTRIBUTE_CACHED_SERIALIZATION)) {
+            Integer cacheId = attributeIdFromCache(attribute);
+            if (cacheId != null) {
+                writeZLong(cacheId);
+                return false;
+            }
+
+            cacheId = cacheAttribute(attribute);
+            writeZLong(-1 - cacheId);
+        }
+        return true;
+    }
+
+    private Integer attributeIdFromCache(Attribute attr) {
+        return cachedAttributes.get(attr);
+    }
+
+    private int cacheAttribute(Attribute attr) {
+        if (cachedAttributes.containsKey(attr)) {
+            throw new IllegalArgumentException("Attribute already present in the serialization cache [" + attr + "]");
+        }
+        int id = cachedAttributes.size();
+        if (id >= maxSerializedAttributes) {
+            throw new InvalidArgumentException("Limit of the number of serialized attributes exceeded [{}]", maxSerializedAttributes);
+        }
+        cachedAttributes.put(attr, id);
+        return id;
+    }
+
     /**
      * The byte representing a {@link Block} sent for the first time. The byte
      * will be followed by a {@link StreamOutput#writeVInt} encoded identifier

+ 3 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/AbstractAttributeTestCase.java

@@ -12,12 +12,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
 import org.elasticsearch.xpack.esql.session.EsqlConfigurationSerializationTests;
 
 import java.io.IOException;
@@ -82,7 +84,7 @@ public abstract class AbstractAttributeTestCase<T extends Attribute> extends Abs
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
-            out.writeNamedWriteable(a);
+            new PlanStreamOutput(out, new PlanNameRegistry(), EsqlTestUtils.TEST_CFG).writeNamedWriteable(a);
         }
 
         @Override

+ 4 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MetadataAttributeTests.java

@@ -16,6 +16,10 @@ import org.elasticsearch.xpack.esql.core.type.DataType;
 public class MetadataAttributeTests extends AbstractAttributeTestCase<MetadataAttribute> {
     @Override
     protected MetadataAttribute create() {
+        return randomMetadataAttribute();
+    }
+
+    public static MetadataAttribute randomMetadataAttribute() {
         Source source = Source.EMPTY;
         String name = randomAlphaOfLength(5);
         DataType type = randomFrom(DataType.types());

+ 4 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttributeTests.java

@@ -15,6 +15,10 @@ import org.elasticsearch.xpack.esql.core.type.UnsupportedEsFieldTests;
 public class UnsupportedAttributeTests extends AbstractAttributeTestCase<UnsupportedAttribute> {
     @Override
     protected UnsupportedAttribute create() {
+        return randomUnsupportedAttribute();
+    }
+
+    public static UnsupportedAttribute randomUnsupportedAttribute() {
         String name = randomAlphaOfLength(5);
         UnsupportedEsField field = UnsupportedEsFieldTests.randomUnsupportedEsField(4);
         String customMessage = randomBoolean() ? null : randomAlphaOfLength(9);

+ 171 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutputTests.java

@@ -18,17 +18,31 @@ import org.elasticsearch.compute.data.IntBlock;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.TransportVersionUtils;
 import org.elasticsearch.xpack.esql.Column;
+import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
+import org.elasticsearch.xpack.esql.core.expression.NameId;
 import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.core.type.EsField;
+import org.elasticsearch.xpack.esql.expression.function.MetadataAttributeTests;
+import org.elasticsearch.xpack.esql.expression.function.ReferenceAttributeTests;
+import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
+import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttributeTests;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.esql.session.EsqlConfigurationSerializationTests;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
 public class PlanStreamOutputTests extends ESTestCase {
@@ -113,6 +127,153 @@ public class PlanStreamOutputTests extends ESTestCase {
         }
     }
 
+    public void testWriteAttributeMultipleTimes() throws IOException {
+        Attribute attribute = randomAttribute();
+        EsqlConfiguration configuration = EsqlConfigurationSerializationTests.randomConfiguration();
+        try (
+            BytesStreamOutput out = new BytesStreamOutput();
+            PlanStreamOutput planStream = new PlanStreamOutput(out, PlanNameRegistry.INSTANCE, configuration)
+        ) {
+            int occurrences = randomIntBetween(2, 150);
+            for (int i = 0; i < occurrences; i++) {
+                planStream.writeNamedWriteable(attribute);
+            }
+            int depth = 0;
+            Attribute parent = attribute;
+            while (parent != null) {
+                depth++;
+                parent = parent instanceof FieldAttribute f ? f.parent() : null;
+            }
+            assertThat(planStream.cachedAttributes.size(), is(depth));
+            try (PlanStreamInput in = new PlanStreamInput(out.bytes().streamInput(), PlanNameRegistry.INSTANCE, REGISTRY, configuration)) {
+                Attribute first = in.readNamedWriteable(Attribute.class);
+                for (int i = 1; i < occurrences; i++) {
+                    Attribute next = in.readNamedWriteable(Attribute.class);
+                    assertThat(first, sameInstance(next));
+                }
+                for (int i = 0; i < depth; i++) {
+                    assertThat(first, equalTo(attribute));
+                    first = first instanceof FieldAttribute f ? f.parent() : null;
+                    attribute = attribute instanceof FieldAttribute f ? f.parent() : null;
+                }
+                assertThat(first, is(nullValue()));
+                assertThat(attribute, is(nullValue()));
+            }
+        }
+    }
+
+    public void testWriteMultipleAttributes() throws IOException {
+        EsqlConfiguration configuration = EsqlConfigurationSerializationTests.randomConfiguration();
+        try (
+            BytesStreamOutput out = new BytesStreamOutput();
+            PlanStreamOutput planStream = new PlanStreamOutput(out, PlanNameRegistry.INSTANCE, configuration)
+        ) {
+            List<Attribute> attrs = new ArrayList<>();
+            int occurrences = randomIntBetween(2, 300);
+            for (int i = 0; i < occurrences; i++) {
+                attrs.add(randomAttribute());
+            }
+
+            // send all the attributes, three times
+            for (int i = 0; i < 3; i++) {
+                for (Attribute attr : attrs) {
+                    planStream.writeNamedWriteable(attr);
+                }
+            }
+
+            try (PlanStreamInput in = new PlanStreamInput(out.bytes().streamInput(), PlanNameRegistry.INSTANCE, REGISTRY, configuration)) {
+                List<Attribute> readAttrs = new ArrayList<>();
+                for (int i = 0; i < occurrences; i++) {
+                    readAttrs.add(in.readNamedWriteable(Attribute.class));
+                    assertThat(readAttrs.get(i), equalTo(attrs.get(i)));
+                }
+                // two more times
+                for (int i = 0; i < 2; i++) {
+                    for (int j = 0; j < occurrences; j++) {
+                        Attribute attr = in.readNamedWriteable(Attribute.class);
+                        assertThat(attr, sameInstance(readAttrs.get(j)));
+                    }
+                }
+            }
+        }
+    }
+
+    public void testWriteMultipleAttributesWithSmallCache() throws IOException {
+        EsqlConfiguration configuration = EsqlConfigurationSerializationTests.randomConfiguration();
+        try (
+            BytesStreamOutput out = new BytesStreamOutput();
+            PlanStreamOutput planStream = new PlanStreamOutput(out, PlanNameRegistry.INSTANCE, configuration, PlanNamedTypes::name, 10)
+        ) {
+            expectThrows(InvalidArgumentException.class, () -> {
+                for (int i = 0; i <= 10; i++) {
+                    planStream.writeNamedWriteable(randomAttribute());
+                }
+            });
+        }
+    }
+
+    public void testWriteEqualAttributesDifferentID() throws IOException {
+        EsqlConfiguration configuration = EsqlConfigurationSerializationTests.randomConfiguration();
+        try (
+            BytesStreamOutput out = new BytesStreamOutput();
+            PlanStreamOutput planStream = new PlanStreamOutput(out, PlanNameRegistry.INSTANCE, configuration)
+        ) {
+
+            Attribute one = randomAttribute();
+            Attribute two = one.withId(new NameId());
+
+            planStream.writeNamedWriteable(one);
+            planStream.writeNamedWriteable(two);
+
+            try (PlanStreamInput in = new PlanStreamInput(out.bytes().streamInput(), PlanNameRegistry.INSTANCE, REGISTRY, configuration)) {
+                Attribute oneCopy = in.readNamedWriteable(Attribute.class);
+                Attribute twoCopy = in.readNamedWriteable(Attribute.class);
+
+                assertThat(oneCopy, equalTo(one));
+                assertThat(twoCopy, equalTo(two));
+
+                assertThat(oneCopy.id(), not(equalTo(twoCopy.id())));
+            }
+        }
+    }
+
+    public void testWriteDifferentAttributesSameID() throws IOException {
+        EsqlConfiguration configuration = EsqlConfigurationSerializationTests.randomConfiguration();
+        try (
+            BytesStreamOutput out = new BytesStreamOutput();
+            PlanStreamOutput planStream = new PlanStreamOutput(out, PlanNameRegistry.INSTANCE, configuration)
+        ) {
+
+            Attribute one = randomAttribute();
+            Attribute two = randomAttribute().withId(one.id());
+
+            planStream.writeNamedWriteable(one);
+            planStream.writeNamedWriteable(two);
+
+            try (PlanStreamInput in = new PlanStreamInput(out.bytes().streamInput(), PlanNameRegistry.INSTANCE, REGISTRY, configuration)) {
+                Attribute oneCopy = in.readNamedWriteable(Attribute.class);
+                Attribute twoCopy = in.readNamedWriteable(Attribute.class);
+
+                assertThat(oneCopy, equalTo(one));
+                assertThat(twoCopy, equalTo(two));
+
+                assertThat(oneCopy, not(equalTo(twoCopy)));
+                assertThat(oneCopy.id(), equalTo(twoCopy.id()));
+            }
+        }
+    }
+
+    private static Attribute randomAttribute() {
+        return switch (randomInt(3)) {
+            case 0 -> PlanNamedTypesTests.randomFieldAttribute();
+            case 1 -> ReferenceAttributeTests.randomReferenceAttribute();
+            case 2 -> UnsupportedAttributeTests.randomUnsupportedAttribute();
+            case 3 -> MetadataAttributeTests.randomMetadataAttribute();
+            default -> throw new IllegalArgumentException();
+
+        };
+    }
+
     private EsqlConfiguration randomConfiguration(Map<String, Map<String, Column>> tables) {
         return EsqlConfigurationSerializationTests.randomConfiguration("query_" + randomAlphaOfLength(1), tables);
     }
@@ -133,5 +294,14 @@ public class PlanStreamOutputTests extends ESTestCase {
         BigArrays.NON_RECYCLING_INSTANCE
     );
 
-    private static final NamedWriteableRegistry REGISTRY = new NamedWriteableRegistry(Block.getNamedWriteables());
+    private static final NamedWriteableRegistry REGISTRY;
+
+    static {
+        List<NamedWriteableRegistry.Entry> writeables = new ArrayList<>();
+        writeables.addAll(Block.getNamedWriteables());
+        writeables.addAll(Attribute.getNamedWriteables());
+        writeables.add(UnsupportedAttribute.ENTRY);
+        writeables.addAll(EsField.getNamedWriteables());
+        REGISTRY = new NamedWriteableRegistry(new ArrayList<>(new HashSet<>(writeables)));
+    }
 }