Browse Source

Reapply "ESQL: Remove parent from FieldAttribute (#112881)" (#115006) (#115007) (#115035)

This reverts commit 17ecb66a0653bc561f6e44a3e688942e304664d5 and
reapplies https://github.com/elastic/elasticsearch/pull/112881 once the
previous, non-backported transport version bump is dealt with.
Alexander Spies 1 year ago
parent
commit
068f51d76a
30 changed files with 323 additions and 134 deletions
  1. 5 0
      docs/changelog/112881.yaml
  2. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  3. 4 3
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Alias.java
  4. 4 3
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java
  5. 63 34
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java
  6. 4 3
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/MetadataAttribute.java
  7. 3 2
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/NamedExpression.java
  8. 9 1
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/ReferenceAttribute.java
  9. 9 1
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/TypedAttribute.java
  10. 2 1
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/UnresolvedAttribute.java
  11. 4 4
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java
  12. 5 4
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DateEsField.java
  13. 6 3
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/EsField.java
  14. 5 4
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/InvalidMappedField.java
  15. 4 4
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/KeywordEsField.java
  16. 5 4
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/MultiTypeEsField.java
  17. 4 4
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/TextEsField.java
  18. 7 6
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/UnsupportedEsField.java
  19. 10 0
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamInput.java
  20. 12 0
      x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamOutput.java
  21. 3 3
      x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/expression/FieldAttributeTestUtils.java
  22. 6 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java
  23. 8 4
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttribute.java
  24. 9 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
  25. 13 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
  26. 6 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java
  27. 5 5
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/FieldAttributeTests.java
  28. 47 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/index/EsIndexSerializationTests.java
  29. 1 16
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutputTests.java
  30. 59 8
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java

+ 5 - 0
docs/changelog/112881.yaml

@@ -0,0 +1,5 @@
+pr: 112881
+summary: "ESQL: Remove parent from `FieldAttribute`"
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -176,6 +176,7 @@ public class TransportVersions {
     public static final TransportVersion CONVERT_FAILURE_STORE_OPTIONS_TO_SELECTOR_OPTIONS_INTERNALLY = def(8_772_00_0);
     public static final TransportVersion REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(8_773_00_0);
     public static final TransportVersion REVERT_REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(8_774_00_0);
+    public static final TransportVersion ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED = def(8_775_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 4 - 3
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Alias.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.core.expression;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -42,11 +43,11 @@ public final class Alias extends NamedExpression {
         this(source, name, child, null);
     }
 
-    public Alias(Source source, String name, Expression child, NameId id) {
+    public Alias(Source source, String name, Expression child, @Nullable NameId id) {
         this(source, name, child, id, false);
     }
 
-    public Alias(Source source, String name, Expression child, NameId id, boolean synthetic) {
+    public Alias(Source source, String name, Expression child, @Nullable NameId id, boolean synthetic) {
         super(source, name, singletonList(child), id, synthetic);
         this.child = child;
     }
@@ -55,7 +56,7 @@ public final class Alias extends NamedExpression {
     /**
      * Old constructor from when this had a qualifier string. Still needed to not break serialization.
      */
-    private Alias(Source source, String name, String qualifier, Expression child, NameId id, boolean synthetic) {
+    private Alias(Source source, String name, String qualifier, Expression child, @Nullable NameId id, boolean synthetic) {
         this(source, name, child, id, synthetic);
     }
 

+ 4 - 3
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.esql.core.expression;
 
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 
@@ -41,15 +42,15 @@ public abstract class Attribute extends NamedExpression {
     // can the attr be null - typically used in JOINs
     private final Nullability nullability;
 
-    public Attribute(Source source, String name, NameId id) {
+    public Attribute(Source source, String name, @Nullable NameId id) {
         this(source, name, Nullability.TRUE, id);
     }
 
-    public Attribute(Source source, String name, Nullability nullability, NameId id) {
+    public Attribute(Source source, String name, Nullability nullability, @Nullable NameId id) {
         this(source, name, nullability, id, false);
     }
 
-    public Attribute(Source source, String name, Nullability nullability, NameId id, boolean synthetic) {
+    public Attribute(Source source, String name, Nullability nullability, @Nullable NameId id, boolean synthetic) {
         super(source, name, emptyList(), id, synthetic);
         this.nullability = nullability;
     }

+ 63 - 34
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/FieldAttribute.java

@@ -6,21 +6,25 @@
  */
 package org.elasticsearch.xpack.esql.core.expression;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
 import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
-import org.elasticsearch.xpack.esql.core.util.StringUtils;
 
 import java.io.IOException;
 import java.util.Objects;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * Attribute for an ES field.
  * To differentiate between the different type of fields this class offers:
@@ -37,32 +41,31 @@ public class FieldAttribute extends TypedAttribute {
         FieldAttribute::readFrom
     );
 
-    private final FieldAttribute parent;
-    private final String path;
+    private final String parentName;
     private final EsField field;
 
     public FieldAttribute(Source source, String name, EsField field) {
         this(source, null, name, field);
     }
 
-    public FieldAttribute(Source source, FieldAttribute parent, String name, EsField field) {
-        this(source, parent, name, field, Nullability.TRUE, null, false);
+    public FieldAttribute(Source source, @Nullable String parentName, String name, EsField field) {
+        this(source, parentName, name, field, Nullability.TRUE, null, false);
     }
 
-    public FieldAttribute(Source source, FieldAttribute parent, String name, EsField field, boolean synthetic) {
-        this(source, parent, name, field, Nullability.TRUE, null, synthetic);
+    public FieldAttribute(Source source, @Nullable String parentName, String name, EsField field, boolean synthetic) {
+        this(source, parentName, name, field, Nullability.TRUE, null, synthetic);
     }
 
     public FieldAttribute(
         Source source,
-        FieldAttribute parent,
+        @Nullable String parentName,
         String name,
         EsField field,
         Nullability nullability,
-        NameId id,
+        @Nullable NameId id,
         boolean synthetic
     ) {
-        this(source, parent, name, field.getDataType(), field, nullability, id, synthetic);
+        this(source, parentName, name, field.getDataType(), field, nullability, id, synthetic);
     }
 
     /**
@@ -71,17 +74,16 @@ public class FieldAttribute extends TypedAttribute {
      */
     FieldAttribute(
         Source source,
-        FieldAttribute parent,
+        @Nullable String parentName,
         String name,
         DataType type,
         EsField field,
         Nullability nullability,
-        NameId id,
+        @Nullable NameId id,
         boolean synthetic
     ) {
         super(source, name, type, nullability, id, synthetic);
-        this.path = parent != null ? parent.name() : StringUtils.EMPTY;
-        this.parent = parent;
+        this.parentName = parentName;
         this.field = field;
     }
 
@@ -91,16 +93,16 @@ public class FieldAttribute extends TypedAttribute {
      */
     private FieldAttribute(
         Source source,
-        FieldAttribute parent,
+        @Nullable String parentName,
         String name,
         DataType type,
         EsField field,
-        String qualifier,
+        @Nullable String qualifier,
         Nullability nullability,
-        NameId id,
+        @Nullable NameId id,
         boolean synthetic
     ) {
-        this(source, parent, name, type, field, nullability, id, synthetic);
+        this(source, parentName, name, type, field, nullability, id, synthetic);
     }
 
     private FieldAttribute(StreamInput in) throws IOException {
@@ -114,8 +116,8 @@ public class FieldAttribute extends TypedAttribute {
          */
         this(
             Source.readFrom((StreamInput & PlanStreamInput) in),
-            in.readOptionalWriteable(FieldAttribute::readFrom),
-            ((PlanStreamInput) in).readCachedString(),
+            readParentName(in),
+            readCachedStringWithVersionCheck(in),
             DataType.readFrom(in),
             EsField.readFrom(in),
             in.readOptionalString(),
@@ -129,8 +131,8 @@ public class FieldAttribute extends TypedAttribute {
     public void writeTo(StreamOutput out) throws IOException {
         if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
             Source.EMPTY.writeTo(out);
-            out.writeOptionalWriteable(parent);
-            ((PlanStreamOutput) out).writeCachedString(name());
+            writeParentName(out);
+            writeCachedStringWithVersionCheck(out, name());
             dataType().writeTo(out);
             field.writeTo(out);
             // We used to write the qualifier here. We can still do if needed in the future.
@@ -145,6 +147,26 @@ public class FieldAttribute extends TypedAttribute {
         return ((PlanStreamInput) in).readAttributeWithCache(FieldAttribute::new);
     }
 
+    private void writeParentName(StreamOutput out) throws IOException {
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED)) {
+            ((PlanStreamOutput) out).writeOptionalCachedString(parentName);
+        } else {
+            // Previous versions only used the parent field attribute to retrieve the parent's name, so we can use just any
+            // fake FieldAttribute here as long as the name is correct.
+            FieldAttribute fakeParent = parentName() == null ? null : new FieldAttribute(Source.EMPTY, parentName(), field());
+            out.writeOptionalWriteable(fakeParent);
+        }
+    }
+
+    private static String readParentName(StreamInput in) throws IOException {
+        if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED)) {
+            return ((PlanStreamInput) in).readOptionalCachedString();
+        }
+
+        FieldAttribute parent = in.readOptionalWriteable(FieldAttribute::readFrom);
+        return parent == null ? null : parent.name();
+    }
+
     @Override
     public String getWriteableName() {
         return ENTRY.name;
@@ -152,15 +174,22 @@ public class FieldAttribute extends TypedAttribute {
 
     @Override
     protected NodeInfo<FieldAttribute> info() {
-        return NodeInfo.create(this, FieldAttribute::new, parent, name(), dataType(), field, (String) null, nullable(), id(), synthetic());
-    }
-
-    public FieldAttribute parent() {
-        return parent;
+        return NodeInfo.create(
+            this,
+            FieldAttribute::new,
+            parentName,
+            name(),
+            dataType(),
+            field,
+            (String) null,
+            nullable(),
+            id(),
+            synthetic()
+        );
     }
 
-    public String path() {
-        return path;
+    public String parentName() {
+        return parentName;
     }
 
     /**
@@ -174,7 +203,7 @@ public class FieldAttribute extends TypedAttribute {
         if ((synthetic() || name().startsWith(SYNTHETIC_ATTRIBUTE_NAME_PREFIX)) == false) {
             return name();
         }
-        return Strings.hasText(path) ? path + "." + field.getName() : field.getName();
+        return Strings.hasText(parentName) ? parentName + "." + field.getName() : field.getName();
     }
 
     public EsField.Exact getExactInfo() {
@@ -190,13 +219,13 @@ public class FieldAttribute extends TypedAttribute {
     }
 
     private FieldAttribute innerField(EsField type) {
-        return new FieldAttribute(source(), this, name() + "." + type.getName(), type, nullable(), id(), synthetic());
+        return new FieldAttribute(source(), name(), name() + "." + type.getName(), type, nullable(), id(), synthetic());
     }
 
     @Override
     protected Attribute clone(Source source, String name, DataType type, Nullability nullability, NameId id, boolean synthetic) {
         // Ignore `type`, this must be the same as the field's type.
-        return new FieldAttribute(source, parent, name, field, nullability, id, synthetic);
+        return new FieldAttribute(source, parentName, name, field, nullability, id, synthetic);
     }
 
     @Override
@@ -206,13 +235,13 @@ public class FieldAttribute extends TypedAttribute {
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), path, field);
+        return Objects.hash(super.hashCode(), parentName, field);
     }
 
     @Override
     public boolean equals(Object obj) {
         return super.equals(obj)
-            && Objects.equals(path, ((FieldAttribute) obj).path)
+            && Objects.equals(parentName, ((FieldAttribute) obj).parentName)
             && Objects.equals(field, ((FieldAttribute) obj).field);
     }
 

+ 4 - 3
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/MetadataAttribute.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.esql.core.expression;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.mapper.IdFieldMapper;
 import org.elasticsearch.index.mapper.IgnoredFieldMapper;
@@ -59,7 +60,7 @@ public class MetadataAttribute extends TypedAttribute {
         String name,
         DataType dataType,
         Nullability nullability,
-        NameId id,
+        @Nullable NameId id,
         boolean synthetic,
         boolean searchable
     ) {
@@ -79,9 +80,9 @@ public class MetadataAttribute extends TypedAttribute {
         Source source,
         String name,
         DataType dataType,
-        String qualifier,
+        @Nullable String qualifier,
         Nullability nullability,
-        NameId id,
+        @Nullable NameId id,
         boolean synthetic,
         boolean searchable
     ) {

+ 3 - 2
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/NamedExpression.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.core.expression;
 
 import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 
 import java.util.ArrayList;
@@ -32,11 +33,11 @@ public abstract class NamedExpression extends Expression implements NamedWriteab
     private final NameId id;
     private final boolean synthetic;
 
-    public NamedExpression(Source source, String name, List<Expression> children, NameId id) {
+    public NamedExpression(Source source, String name, List<Expression> children, @Nullable NameId id) {
         this(source, name, children, id, false);
     }
 
-    public NamedExpression(Source source, String name, List<Expression> children, NameId id, boolean synthetic) {
+    public NamedExpression(Source source, String name, List<Expression> children, @Nullable NameId id, boolean synthetic) {
         super(source, children);
         this.name = name;
         this.id = id == null ? new NameId() : id;

+ 9 - 1
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/ReferenceAttribute.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.core.expression;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -31,7 +32,14 @@ public class ReferenceAttribute extends TypedAttribute {
         this(source, name, dataType, Nullability.FALSE, null, false);
     }
 
-    public ReferenceAttribute(Source source, String name, DataType dataType, Nullability nullability, NameId id, boolean synthetic) {
+    public ReferenceAttribute(
+        Source source,
+        String name,
+        DataType dataType,
+        Nullability nullability,
+        @Nullable NameId id,
+        boolean synthetic
+    ) {
         super(source, name, dataType, nullability, id, synthetic);
     }
 

+ 9 - 1
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/TypedAttribute.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.esql.core.expression;
 
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 
@@ -15,7 +16,14 @@ public abstract class TypedAttribute extends Attribute {
 
     private final DataType dataType;
 
-    protected TypedAttribute(Source source, String name, DataType dataType, Nullability nullability, NameId id, boolean synthetic) {
+    protected TypedAttribute(
+        Source source,
+        String name,
+        DataType dataType,
+        Nullability nullability,
+        @Nullable NameId id,
+        boolean synthetic
+    ) {
         super(source, name, nullability, id, synthetic);
         this.dataType = dataType;
     }

+ 2 - 1
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/UnresolvedAttribute.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.esql.core.expression;
 
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.capabilities.Unresolvable;
 import org.elasticsearch.xpack.esql.core.capabilities.UnresolvedException;
 import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
@@ -33,7 +34,7 @@ public class UnresolvedAttribute extends Attribute implements Unresolvable {
     }
 
     @SuppressWarnings("this-escape")
-    public UnresolvedAttribute(Source source, String name, NameId id, String unresolvedMessage, Object resolutionMetadata) {
+    public UnresolvedAttribute(Source source, String name, @Nullable NameId id, String unresolvedMessage, Object resolutionMetadata) {
         super(source, name, id);
         this.customMessage = unresolvedMessage != null;
         this.unresolvedMsg = unresolvedMessage == null ? errorMessage(name(), null) : unresolvedMessage;

+ 4 - 4
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

@@ -14,8 +14,6 @@ import org.elasticsearch.index.IndexMode;
 import org.elasticsearch.index.mapper.SourceFieldMapper;
 import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
 import org.elasticsearch.xpack.esql.core.plugin.EsqlCorePlugin;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.math.BigInteger;
@@ -32,6 +30,8 @@ import java.util.function.Function;
 
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.Collectors.toUnmodifiableMap;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
 
 public enum DataType {
     /**
@@ -535,12 +535,12 @@ public enum DataType {
     }
 
     public void writeTo(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(typeName);
+        writeCachedStringWithVersionCheck(out, typeName);
     }
 
     public static DataType readFrom(StreamInput in) throws IOException {
         // TODO: Use our normal enum serialization pattern
-        return readFrom(((PlanStreamInput) in).readCachedString());
+        return readFrom(readCachedStringWithVersionCheck(in));
     }
 
     /**

+ 5 - 4
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DateEsField.java

@@ -8,12 +8,13 @@ package org.elasticsearch.xpack.esql.core.type;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.util.Map;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * Information about a field in an ES index with the {@code date} type
  */
@@ -28,12 +29,12 @@ public class DateEsField extends EsField {
     }
 
     protected DateEsField(StreamInput in) throws IOException {
-        this(((PlanStreamInput) in).readCachedString(), DataType.DATETIME, in.readImmutableMap(EsField::readFrom), in.readBoolean());
+        this(readCachedStringWithVersionCheck(in), DataType.DATETIME, in.readImmutableMap(EsField::readFrom), in.readBoolean());
     }
 
     @Override
     public void writeContent(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(getName());
+        writeCachedStringWithVersionCheck(out, getName());
         out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
         out.writeBoolean(isAggregatable());
     }

+ 6 - 3
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/EsField.java

@@ -18,6 +18,9 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * Information about a field in an ES index.
  */
@@ -60,7 +63,7 @@ public class EsField implements Writeable {
     }
 
     public EsField(StreamInput in) throws IOException {
-        this.name = ((PlanStreamInput) in).readCachedString();
+        this.name = readCachedStringWithVersionCheck(in);
         this.esDataType = readDataType(in);
         this.properties = in.readImmutableMap(EsField::readFrom);
         this.aggregatable = in.readBoolean();
@@ -68,7 +71,7 @@ public class EsField implements Writeable {
     }
 
     private DataType readDataType(StreamInput in) throws IOException {
-        String name = ((PlanStreamInput) in).readCachedString();
+        String name = readCachedStringWithVersionCheck(in);
         if (in.getTransportVersion().before(TransportVersions.ESQL_NESTED_UNSUPPORTED) && name.equalsIgnoreCase("NESTED")) {
             /*
              * The "nested" data type existed in older versions of ESQL but was
@@ -98,7 +101,7 @@ public class EsField implements Writeable {
      * This needs to be overridden by subclasses for specific serialization
      */
     public void writeContent(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(name);
+        writeCachedStringWithVersionCheck(out, name);
         esDataType.writeTo(out);
         out.writeMap(properties, (o, x) -> x.writeTo(out));
         out.writeBoolean(aggregatable);

+ 5 - 4
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/InvalidMappedField.java

@@ -10,8 +10,6 @@ package org.elasticsearch.xpack.esql.core.type;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.util.Map;
@@ -20,6 +18,9 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * Representation of field mapped differently across indices.
  * Used during mapping discovery only.
@@ -54,7 +55,7 @@ public class InvalidMappedField extends EsField {
     }
 
     protected InvalidMappedField(StreamInput in) throws IOException {
-        this(((PlanStreamInput) in).readCachedString(), in.readString(), in.readImmutableMap(StreamInput::readString, EsField::readFrom));
+        this(readCachedStringWithVersionCheck(in), in.readString(), in.readImmutableMap(StreamInput::readString, EsField::readFrom));
     }
 
     public Set<DataType> types() {
@@ -63,7 +64,7 @@ public class InvalidMappedField extends EsField {
 
     @Override
     public void writeContent(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(getName());
+        writeCachedStringWithVersionCheck(out, getName());
         out.writeString(errorMessage);
         out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
     }

+ 4 - 4
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/KeywordEsField.java

@@ -8,8 +8,6 @@ package org.elasticsearch.xpack.esql.core.type;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -17,6 +15,8 @@ import java.util.Map;
 import java.util.Objects;
 
 import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
 
 /**
  * Information about a field in an ES index with the {@code keyword} type.
@@ -61,7 +61,7 @@ public class KeywordEsField extends EsField {
 
     public KeywordEsField(StreamInput in) throws IOException {
         this(
-            ((PlanStreamInput) in).readCachedString(),
+            readCachedStringWithVersionCheck(in),
             KEYWORD,
             in.readImmutableMap(EsField::readFrom),
             in.readBoolean(),
@@ -73,7 +73,7 @@ public class KeywordEsField extends EsField {
 
     @Override
     public void writeContent(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(getName());
+        writeCachedStringWithVersionCheck(out, getName());
         out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
         out.writeBoolean(isAggregatable());
         out.writeInt(precision);

+ 5 - 4
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/MultiTypeEsField.java

@@ -10,8 +10,6 @@ package org.elasticsearch.xpack.esql.core.type;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -19,6 +17,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * During IndexResolution it could occur that the same field is mapped to different types in different indices.
  * The class MultiTypeEfField.UnresolvedField holds that information and allows for later resolution of the field
@@ -39,7 +40,7 @@ public class MultiTypeEsField extends EsField {
 
     protected MultiTypeEsField(StreamInput in) throws IOException {
         this(
-            ((PlanStreamInput) in).readCachedString(),
+            readCachedStringWithVersionCheck(in),
             DataType.readFrom(in),
             in.readBoolean(),
             in.readImmutableMap(i -> i.readNamedWriteable(Expression.class))
@@ -48,7 +49,7 @@ public class MultiTypeEsField extends EsField {
 
     @Override
     public void writeContent(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(getName());
+        writeCachedStringWithVersionCheck(out, getName());
         getDataType().writeTo(out);
         out.writeBoolean(isAggregatable());
         out.writeMap(getIndexToConversionExpressions(), (o, v) -> out.writeNamedWriteable(v));

+ 4 - 4
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/TextEsField.java

@@ -10,8 +10,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.util.Map;
@@ -19,6 +17,8 @@ import java.util.function.Function;
 
 import static org.elasticsearch.xpack.esql.core.type.DataType.KEYWORD;
 import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
 
 /**
  * Information about a field in an es index with the {@code text} type.
@@ -34,12 +34,12 @@ public class TextEsField extends EsField {
     }
 
     protected TextEsField(StreamInput in) throws IOException {
-        this(((PlanStreamInput) in).readCachedString(), in.readImmutableMap(EsField::readFrom), in.readBoolean(), in.readBoolean());
+        this(readCachedStringWithVersionCheck(in), in.readImmutableMap(EsField::readFrom), in.readBoolean(), in.readBoolean());
     }
 
     @Override
     public void writeContent(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(getName());
+        writeCachedStringWithVersionCheck(out, getName());
         out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
         out.writeBoolean(isAggregatable());
         out.writeBoolean(isAlias());

+ 7 - 6
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/UnsupportedEsField.java

@@ -8,14 +8,15 @@ package org.elasticsearch.xpack.esql.core.type;
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
-import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
 
 import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * Information about a field in an ES index that cannot be supported by ESQL.
  * All the subfields (properties) of an unsupported type are also be unsupported.
@@ -37,8 +38,8 @@ public class UnsupportedEsField extends EsField {
 
     public UnsupportedEsField(StreamInput in) throws IOException {
         this(
-            ((PlanStreamInput) in).readCachedString(),
-            ((PlanStreamInput) in).readCachedString(),
+            readCachedStringWithVersionCheck(in),
+            readCachedStringWithVersionCheck(in),
             in.readOptionalString(),
             in.readImmutableMap(EsField::readFrom)
         );
@@ -46,8 +47,8 @@ public class UnsupportedEsField extends EsField {
 
     @Override
     public void writeContent(StreamOutput out) throws IOException {
-        ((PlanStreamOutput) out).writeCachedString(getName());
-        ((PlanStreamOutput) out).writeCachedString(getOriginalType());
+        writeCachedStringWithVersionCheck(out, getName());
+        writeCachedStringWithVersionCheck(out, getOriginalType());
         out.writeOptionalString(getInherited());
         out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
     }

+ 10 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamInput.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.esql.core.util;
 
+import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.core.CheckedFunction;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
@@ -49,4 +50,13 @@ public interface PlanStreamInput {
     <A extends EsField> A readEsFieldWithCache() throws IOException;
 
     String readCachedString() throws IOException;
+
+    static String readCachedStringWithVersionCheck(StreamInput planStreamInput) throws IOException {
+        if (planStreamInput.getTransportVersion().before(TransportVersions.ESQL_CACHED_STRING_SERIALIZATION)) {
+            return planStreamInput.readString();
+        }
+        return ((PlanStreamInput) planStreamInput).readCachedString();
+    }
+
+    String readOptionalCachedString() throws IOException;
 }

+ 12 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamOutput.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.xpack.esql.core.util;
 
+import org.elasticsearch.TransportVersions;
+import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 
@@ -33,4 +35,14 @@ public interface PlanStreamOutput {
     boolean writeEsFieldCacheHeader(EsField field) throws IOException;
 
     void writeCachedString(String field) throws IOException;
+
+    static void writeCachedStringWithVersionCheck(StreamOutput planStreamOutput, String string) throws IOException {
+        if (planStreamOutput.getTransportVersion().before(TransportVersions.ESQL_CACHED_STRING_SERIALIZATION)) {
+            planStreamOutput.writeString(string);
+        } else {
+            ((PlanStreamOutput) planStreamOutput).writeCachedString(string);
+        }
+    }
+
+    void writeOptionalCachedString(String str) throws IOException;
 }

+ 3 - 3
x-pack/plugin/esql-core/src/test/java/org/elasticsearch/xpack/esql/core/expression/FieldAttributeTestUtils.java

@@ -12,9 +12,9 @@ import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 
 public class FieldAttributeTestUtils {
-    public static final FieldAttribute newFieldAttributeWithType(
+    public static FieldAttribute newFieldAttributeWithType(
         Source source,
-        FieldAttribute parent,
+        String parentName,
         String name,
         DataType type,
         EsField field,
@@ -22,6 +22,6 @@ public class FieldAttributeTestUtils {
         NameId id,
         boolean synthetic
     ) {
-        return new FieldAttribute(source, parent, name, type, field, nullability, id, synthetic);
+        return new FieldAttribute(source, parentName, name, type, field, nullability, id, synthetic);
     }
 }

+ 6 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

@@ -228,13 +228,13 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
         return list;
     }
 
-    private static void mappingAsAttributes(List<Attribute> list, Source source, FieldAttribute parent, Map<String, EsField> mapping) {
+    private static void mappingAsAttributes(List<Attribute> list, Source source, String parentName, Map<String, EsField> mapping) {
         for (Map.Entry<String, EsField> entry : mapping.entrySet()) {
             String name = entry.getKey();
             EsField t = entry.getValue();
 
             if (t != null) {
-                name = parent == null ? name : parent.fieldName() + "." + name;
+                name = parentName == null ? name : parentName + "." + name;
                 var fieldProperties = t.getProperties();
                 var type = t.getDataType().widenSmallNumeric();
                 // due to a bug also copy the field since the Attribute hierarchy extracts the data type
@@ -245,14 +245,14 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
 
                 FieldAttribute attribute = t instanceof UnsupportedEsField uef
                     ? new UnsupportedAttribute(source, name, uef)
-                    : new FieldAttribute(source, parent, name, t);
+                    : new FieldAttribute(source, parentName, name, t);
                 // primitive branch
                 if (DataType.isPrimitive(type)) {
                     list.add(attribute);
                 }
                 // allow compound object even if they are unknown
                 if (fieldProperties.isEmpty() == false) {
-                    mappingAsAttributes(list, source, attribute, fieldProperties);
+                    mappingAsAttributes(list, source, attribute.name(), fieldProperties);
                 }
             }
         }
@@ -1252,7 +1252,7 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
             // NOTE: The name has to start with $$ to not break bwc with 8.15 - in that version, this is how we had to mark this as
             // synthetic to work around a bug.
             String unionTypedFieldName = Attribute.rawTemporaryName(fa.name(), "converted_to", resolvedField.getDataType().typeName());
-            FieldAttribute unionFieldAttribute = new FieldAttribute(fa.source(), fa.parent(), unionTypedFieldName, resolvedField, true);
+            FieldAttribute unionFieldAttribute = new FieldAttribute(fa.source(), fa.parentName(), unionTypedFieldName, resolvedField, true);
             int existingIndex = unionFieldAttributes.indexOf(unionFieldAttribute);
             if (existingIndex >= 0) {
                 // Do not generate multiple name/type combinations with different IDs
@@ -1281,7 +1281,7 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
             FieldAttribute originalFieldAttr = (FieldAttribute) convert.field();
             FieldAttribute resolvedAttr = new FieldAttribute(
                 source,
-                originalFieldAttr.parent(),
+                originalFieldAttr.parentName(),
                 originalFieldAttr.name(),
                 field,
                 originalFieldAttr.nullable(),

+ 8 - 4
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/UnsupportedAttribute.java

@@ -11,6 +11,7 @@ import org.elasticsearch.TransportVersions;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.esql.core.capabilities.Unresolvable;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
@@ -29,6 +30,9 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import java.io.IOException;
 import java.util.Objects;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * Unsupported attribute meaning an attribute that has been found yet cannot be used (hence why UnresolvedAttribute
  * cannot be used) expect in special conditions (currently only in projections to allow it to flow through
@@ -63,11 +67,11 @@ public final class UnsupportedAttribute extends FieldAttribute implements Unreso
         this(source, name, field, null);
     }
 
-    public UnsupportedAttribute(Source source, String name, UnsupportedEsField field, String customMessage) {
+    public UnsupportedAttribute(Source source, String name, UnsupportedEsField field, @Nullable String customMessage) {
         this(source, name, field, customMessage, null);
     }
 
-    public UnsupportedAttribute(Source source, String name, UnsupportedEsField field, String customMessage, NameId id) {
+    public UnsupportedAttribute(Source source, String name, UnsupportedEsField field, @Nullable String customMessage, @Nullable NameId id) {
         super(source, null, name, field, Nullability.TRUE, id, false);
         this.hasCustomMessage = customMessage != null;
         this.message = customMessage == null ? errorMessage(name(), field) : customMessage;
@@ -76,7 +80,7 @@ public final class UnsupportedAttribute extends FieldAttribute implements Unreso
     private UnsupportedAttribute(StreamInput in) throws IOException {
         this(
             Source.readFrom((PlanStreamInput) in),
-            ((PlanStreamInput) in).readCachedString(),
+            readCachedStringWithVersionCheck(in),
             in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ES_FIELD_CACHED_SERIALIZATION)
                 || in.getTransportVersion().isPatchFrom(TransportVersions.V_8_15_2) ? EsField.readFrom(in) : new UnsupportedEsField(in),
             in.readOptionalString(),
@@ -88,7 +92,7 @@ public final class UnsupportedAttribute extends FieldAttribute implements Unreso
     public void writeTo(StreamOutput out) throws IOException {
         if (((PlanStreamOutput) out).writeAttributeCacheHeader(this)) {
             Source.EMPTY.writeTo(out);
-            ((PlanStreamOutput) out).writeCachedString(name());
+            writeCachedStringWithVersionCheck(out, name());
             if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ES_FIELD_CACHED_SERIALIZATION)
                 || out.getTransportVersion().isPatchFrom(TransportVersions.V_8_15_2)) {
                 field().writeTo(out);

+ 9 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java

@@ -37,6 +37,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.LongFunction;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamInput.readCachedStringWithVersionCheck;
+
 /**
  * A customized stream input used to deserialize ESQL physical plan fragments. Complements stream
  * input with methods that read plan nodes, Attributes, Expressions, etc.
@@ -224,7 +226,7 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput
             // it's safe to cast to int, since the max value for this is {@link PlanStreamOutput#MAX_SERIALIZED_ATTRIBUTES}
             int cacheId = Math.toIntExact(readZLong());
             if (cacheId < 0) {
-                String className = readCachedString();
+                String className = readCachedStringWithVersionCheck(this);
                 Writeable.Reader<? extends EsField> reader = EsField.getReader(className);
                 cacheId = -1 - cacheId;
                 EsField result = reader.read(this);
@@ -234,7 +236,7 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput
                 return (A) esFieldFromCache(cacheId);
             }
         } else {
-            String className = readCachedString();
+            String className = readCachedStringWithVersionCheck(this);
             Writeable.Reader<? extends EsField> reader = EsField.getReader(className);
             return (A) reader.read(this);
         }
@@ -245,9 +247,6 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput
      */
     @Override
     public String readCachedString() throws IOException {
-        if (getTransportVersion().before(TransportVersions.ESQL_CACHED_STRING_SERIALIZATION)) {
-            return readString();
-        }
         int cacheId = Math.toIntExact(readZLong());
         if (cacheId < 0) {
             String string = readString();
@@ -259,6 +258,11 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput
         }
     }
 
+    @Override
+    public String readOptionalCachedString() throws IOException {
+        return readBoolean() ? readCachedString() : null;
+    }
+
     private EsField esFieldFromCache(int id) throws IOException {
         EsField field = esFieldsCache[id];
         if (field == null) {

+ 13 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

@@ -30,6 +30,8 @@ import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Map;
 
+import static org.elasticsearch.xpack.esql.core.util.PlanStreamOutput.writeCachedStringWithVersionCheck;
+
 /**
  * A customized stream output used to serialize ESQL physical plan fragments. Complements stream
  * output with methods that write plan nodes, Attributes, Expressions, etc.
@@ -195,7 +197,7 @@ public final class PlanStreamOutput extends StreamOutput implements org.elastics
             cacheId = cacheEsField(field);
             writeZLong(-1 - cacheId);
         }
-        writeCachedString(field.getWriteableName());
+        writeCachedStringWithVersionCheck(this, field.getWriteableName());
         return true;
     }
 
@@ -207,10 +209,6 @@ public final class PlanStreamOutput extends StreamOutput implements org.elastics
      */
     @Override
     public void writeCachedString(String string) throws IOException {
-        if (getTransportVersion().before(TransportVersions.ESQL_CACHED_STRING_SERIALIZATION)) {
-            writeString(string);
-            return;
-        }
         Integer cacheId = stringCache.get(string);
         if (cacheId != null) {
             writeZLong(cacheId);
@@ -226,6 +224,16 @@ public final class PlanStreamOutput extends StreamOutput implements org.elastics
         writeString(string);
     }
 
+    @Override
+    public void writeOptionalCachedString(String str) throws IOException {
+        if (str == null) {
+            writeBoolean(false);
+        } else {
+            writeBoolean(true);
+            writeCachedString(str);
+        }
+    }
+
     private Integer esFieldIdFromCache(EsField field) {
         return cachedEsFields.get(field);
     }

+ 6 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/EsRelation.java

@@ -112,7 +112,12 @@ public class EsRelation extends LeafPlan {
             EsField t = entry.getValue();
 
             if (t != null) {
-                FieldAttribute f = new FieldAttribute(source, parent, parent != null ? parent.name() + "." + name : name, t);
+                FieldAttribute f = new FieldAttribute(
+                    source,
+                    parent != null ? parent.name() : null,
+                    parent != null ? parent.name() + "." + name : name,
+                    t
+                );
                 list.add(f);
                 // object or nested
                 if (t.getProperties().isEmpty() == false) {

+ 5 - 5
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/FieldAttributeTests.java

@@ -20,7 +20,7 @@ import static org.elasticsearch.xpack.esql.core.expression.FieldAttributeTestUti
 public class FieldAttributeTests extends AbstractAttributeTestCase<FieldAttribute> {
     public static FieldAttribute createFieldAttribute(int maxDepth, boolean onlyRepresentable) {
         Source source = Source.EMPTY;
-        FieldAttribute parent = maxDepth == 0 || randomBoolean() ? null : createFieldAttribute(maxDepth - 1, onlyRepresentable);
+        String parentName = maxDepth == 0 || randomBoolean() ? null : randomAlphaOfLength(3);
         String name = randomAlphaOfLength(5);
         DataType type = onlyRepresentable
             ? randomValueOtherThanMany(t -> false == DataType.isRepresentable(t), () -> randomFrom(DataType.types()))
@@ -28,7 +28,7 @@ public class FieldAttributeTests extends AbstractAttributeTestCase<FieldAttribut
         EsField field = AbstractEsFieldTypeTests.randomAnyEsField(maxDepth);
         Nullability nullability = randomFrom(Nullability.values());
         boolean synthetic = randomBoolean();
-        return newFieldAttributeWithType(source, parent, name, type, field, nullability, new NameId(), synthetic);
+        return newFieldAttributeWithType(source, parentName, name, type, field, nullability, new NameId(), synthetic);
     }
 
     @Override
@@ -39,20 +39,20 @@ public class FieldAttributeTests extends AbstractAttributeTestCase<FieldAttribut
     @Override
     protected FieldAttribute mutate(FieldAttribute instance) {
         Source source = instance.source();
-        FieldAttribute parent = instance.parent();
+        String parentName = instance.parentName();
         String name = instance.name();
         DataType type = instance.dataType();
         EsField field = instance.field();
         Nullability nullability = instance.nullable();
         boolean synthetic = instance.synthetic();
         switch (between(0, 5)) {
-            case 0 -> parent = randomValueOtherThan(parent, () -> randomBoolean() ? null : createFieldAttribute(2, false));
+            case 0 -> parentName = randomValueOtherThan(parentName, () -> randomBoolean() ? null : randomAlphaOfLength(2));
             case 1 -> name = randomAlphaOfLength(name.length() + 1);
             case 2 -> type = randomValueOtherThan(type, () -> randomFrom(DataType.types()));
             case 3 -> field = randomValueOtherThan(field, () -> AbstractEsFieldTypeTests.randomAnyEsField(3));
             case 4 -> nullability = randomValueOtherThan(nullability, () -> randomFrom(Nullability.values()));
             case 5 -> synthetic = false == synthetic;
         }
-        return newFieldAttributeWithType(source, parent, name, type, field, nullability, new NameId(), synthetic);
+        return newFieldAttributeWithType(source, parentName, name, type, field, nullability, new NameId(), synthetic);
     }
 }

+ 47 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/index/EsIndexSerializationTests.java

@@ -182,4 +182,51 @@ public class EsIndexSerializationTests extends AbstractWireSerializingTestCase<E
             assertThat(ByteSizeValue.ofBytes(out.bytes().length()), byteSizeEquals(expected));
         }
     }
+
+    public static EsIndex deeplyNestedIndex(int depth, int childrenPerLevel) {
+        String rootFieldName = "root";
+        Map<String, EsField> fields = Map.of(rootFieldName, fieldWithRecursiveChildren(depth, childrenPerLevel, rootFieldName));
+
+        return new EsIndex("deeply-nested", fields);
+    }
+
+    private static EsField fieldWithRecursiveChildren(int depth, int childrenPerLevel, String name) {
+        assert depth >= 1;
+
+        Map<String, EsField> children = new TreeMap<>();
+        String childName;
+        if (depth == 1) {
+            for (int i = 0; i < childrenPerLevel; i++) {
+                childName = "leaf" + i;
+                children.put(childName, new EsField(childName, DataType.KEYWORD, Map.of(), true));
+            }
+        } else {
+            for (int i = 0; i < childrenPerLevel; i++) {
+                childName = "level" + depth + "child" + i;
+                children.put(childName, fieldWithRecursiveChildren(depth - 1, childrenPerLevel, childName));
+            }
+        }
+
+        return new EsField(name, DataType.OBJECT, children, false);
+    }
+
+    /**
+     * Test de-/serialization and size on the wire for an index that has multiple levels of children:
+     * A single root with 9 children, each of which has 9 children etc. 6 levels deep.
+     */
+    public void testDeeplyNestedFields() throws IOException {
+        ByteSizeValue expectedSize = ByteSizeValue.ofBytes(9425494);
+        /*
+         * History:
+         *  9425494b - string serialization #112929
+         */
+
+        int depth = 6;
+        int childrenPerLevel = 9;
+
+        try (BytesStreamOutput out = new BytesStreamOutput(); var pso = new PlanStreamOutput(out, null)) {
+            deeplyNestedIndex(depth, childrenPerLevel).writeTo(pso);
+            assertThat(ByteSizeValue.ofBytes(out.bytes().length()), byteSizeEquals(expectedSize));
+        }
+    }
 }

+ 1 - 16
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutputTests.java

@@ -20,7 +20,6 @@ import org.elasticsearch.test.TransportVersionUtils;
 import org.elasticsearch.xpack.esql.Column;
 import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
 import org.elasticsearch.xpack.esql.core.expression.Attribute;
-import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
 import org.elasticsearch.xpack.esql.core.expression.NameId;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
@@ -44,7 +43,6 @@ import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
 public class PlanStreamOutputTests extends ESTestCase {
@@ -118,26 +116,13 @@ public class PlanStreamOutputTests extends ESTestCase {
             for (int i = 0; i < occurrences; i++) {
                 planStream.writeNamedWriteable(attribute);
             }
-            int depth = 0;
-            Attribute parent = attribute;
-            while (parent != null) {
-                depth++;
-                parent = parent instanceof FieldAttribute f ? f.parent() : null;
-            }
-            assertThat(planStream.cachedAttributes.size(), is(depth));
+            assertThat(planStream.cachedAttributes.size(), is(1));
             try (PlanStreamInput in = new PlanStreamInput(out.bytes().streamInput(), REGISTRY, configuration)) {
                 Attribute first = in.readNamedWriteable(Attribute.class);
                 for (int i = 1; i < occurrences; i++) {
                     Attribute next = in.readNamedWriteable(Attribute.class);
                     assertThat(first, sameInstance(next));
                 }
-                for (int i = 0; i < depth; i++) {
-                    assertThat(first, equalTo(attribute));
-                    first = first instanceof FieldAttribute f ? f.parent() : null;
-                    attribute = attribute instanceof FieldAttribute f ? f.parent() : null;
-                }
-                assertThat(first, is(nullValue()));
-                assertThat(attribute, is(nullValue()));
             }
         }
     }

+ 59 - 8
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/ExchangeSinkExecSerializationTests.java

@@ -80,20 +80,67 @@ public class ExchangeSinkExecSerializationTests extends AbstractPhysicalPlanSeri
      * See {@link #testManyTypeConflicts(boolean, ByteSizeValue)} for more.
      */
     public void testManyTypeConflictsWithParent() throws IOException {
-        testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774214));
+        testManyTypeConflicts(true, ByteSizeValue.ofBytes(2774192));
         /*
          * History:
          *  2 gb+ - start
          * 43.3mb - Cache attribute subclasses #111447
          *  5.6mb - shorten error messages for UnsupportedAttributes #111973
          *  3.1mb - cache EsFields #112008
-         *  2.6mb - string serialization #112929
+         *  2774214b - string serialization #112929
+         *  2774192b - remove field attribute #112881
          */
     }
 
+    private void testManyTypeConflicts(boolean withParent, ByteSizeValue expected) throws IOException {
+        EsIndex index = EsIndexSerializationTests.indexWithManyConflicts(withParent);
+        testSerializePlanWithIndex(index, expected);
+    }
+
+    /**
+     * Test the size of serializing a plan like
+     * FROM index | LIMIT 10
+     * with a single root field that has many children, grandchildren etc.
+     */
+    public void testDeeplyNestedFields() throws IOException {
+        ByteSizeValue expected = ByteSizeValue.ofBytes(47252411);
+        /*
+         * History:
+         *  48223371b - string serialization #112929
+         *  47252411b - remove field attribute #112881
+         */
+
+        int depth = 6;
+        int childrenPerLevel = 8;
+
+        EsIndex index = EsIndexSerializationTests.deeplyNestedIndex(depth, childrenPerLevel);
+        testSerializePlanWithIndex(index, expected);
+    }
+
     /**
-     * Test the size of serializing a plan with many conflicts. Callers of
-     * this method intentionally use a very precise size for the serialized
+     * Test the size of serializing a plan like
+     * FROM index | LIMIT 10 | KEEP one_single_field
+     * with a single root field that has many children, grandchildren etc.
+     */
+    public void testDeeplyNestedFieldsKeepOnlyOne() throws IOException {
+        ByteSizeValue expected = ByteSizeValue.ofBytes(9425806);
+        /*
+         * History:
+         *  9426058b - string serialization #112929
+         *  9425806b - remove field attribute #112881
+         */
+
+        int depth = 6;
+        int childrenPerLevel = 9;
+
+        EsIndex index = EsIndexSerializationTests.deeplyNestedIndex(depth, childrenPerLevel);
+        testSerializePlanWithIndex(index, expected, false);
+    }
+
+    /**
+     * Test the size of serializing the physical plan that will be sent to a data node.
+     * The plan corresponds to `FROM index | LIMIT 10`.
+     * Callers of this method intentionally use a very precise size for the serialized
      * data so a programmer making changes has to think when this size changes.
      * <p>
      *     In general, shrinking the over the wire size is great and the precise
@@ -108,10 +155,14 @@ public class ExchangeSinkExecSerializationTests extends AbstractPhysicalPlanSeri
      *     ESQL impossible to use at all for big mappings with many conflicts.
      * </p>
      */
-    private void testManyTypeConflicts(boolean withParent, ByteSizeValue expected) throws IOException {
-        EsIndex index = EsIndexSerializationTests.indexWithManyConflicts(withParent);
-        List<Attribute> attributes = Analyzer.mappingAsAttributes(randomSource(), index.mapping());
-        EsRelation relation = new EsRelation(randomSource(), index, attributes, IndexMode.STANDARD);
+    private void testSerializePlanWithIndex(EsIndex index, ByteSizeValue expected) throws IOException {
+        testSerializePlanWithIndex(index, expected, true);
+    }
+
+    private void testSerializePlanWithIndex(EsIndex index, ByteSizeValue expected, boolean keepAllFields) throws IOException {
+        List<Attribute> allAttributes = Analyzer.mappingAsAttributes(randomSource(), index.mapping());
+        List<Attribute> keepAttributes = keepAllFields ? allAttributes : List.of(allAttributes.get(0));
+        EsRelation relation = new EsRelation(randomSource(), index, keepAttributes, IndexMode.STANDARD);
         Limit limit = new Limit(randomSource(), new Literal(randomSource(), 10, DataType.INTEGER), relation);
         Project project = new Project(randomSource(), limit, limit.output());
         FragmentExec fragmentExec = new FragmentExec(project);