1
0
Эх сурвалжийг харах

ESQL: Drop all inefficient source serialiation (#111645)

This drops the last remaining usage of the very inefficient
serialization for `Source`. It was used by `SingleValueQuery` only, and
this migrates that to the same serialization that Source uses everywhere
else now.
Nik Everett 1 жил өмнө
parent
commit
0b18f711bd

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -185,6 +185,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_ATTRIBUTE_CACHED_SERIALIZATION = def(8_715_00_0);
     public static final TransportVersion REGISTER_SLM_STATS = def(8_716_00_0);
     public static final TransportVersion ESQL_NESTED_UNSUPPORTED = def(8_717_00_0);
+    public static final TransportVersion ESQL_SINGLE_VALUE_QUERY_SOURCE = def(8_718_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 0 - 47
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/SourceUtils.java

@@ -1,47 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.esql.core.util;
-
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.xpack.esql.core.tree.Location;
-import org.elasticsearch.xpack.esql.core.tree.Source;
-
-import java.io.IOException;
-
-public final class SourceUtils {
-
-    private SourceUtils() {}
-
-    /**
-     * Write a {@link Source} including the text in it.
-     * @deprecated replace with {@link Source#writeTo}.
-     *             That's not binary compatible so the replacement is complex.
-     */
-    @Deprecated
-    public static void writeSource(StreamOutput out, Source source) throws IOException {
-        out.writeInt(source.source().getLineNumber());
-        out.writeInt(source.source().getColumnNumber());
-        out.writeString(source.text());
-    }
-
-    /**
-     * Read a {@link Source} including the text in it.
-     * @deprecated replace with {@link Source#readFrom(StreamInput)}.
-     *             That's not binary compatible so the replacement is complex.
-     */
-    @Deprecated
-    public static Source readSource(StreamInput in) throws IOException {
-        int line = in.readInt();
-        int column = in.readInt();
-        int charPositionInLine = column - 1;
-
-        String text = in.readString();
-        return new Source(new Location(line, charPositionInLine), text);
-    }
-}

+ 30 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/querydsl/query/SingleValueQuery.java

@@ -24,15 +24,14 @@ import org.elasticsearch.index.query.QueryRewriteContext;
 import org.elasticsearch.index.query.SearchExecutionContext;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.esql.core.querydsl.query.Query;
+import org.elasticsearch.xpack.esql.core.tree.Location;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.expression.function.Warnings;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 
 import java.io.IOException;
 import java.util.Objects;
 
-import static org.elasticsearch.xpack.esql.core.util.SourceUtils.readSource;
-import static org.elasticsearch.xpack.esql.core.util.SourceUtils.writeSource;
-
 /**
  * Lucene query that wraps another query and only selects documents that match
  * the wrapped query <strong>and</strong> have a single field value.
@@ -107,11 +106,12 @@ public class SingleValueQuery extends Query {
             super(in);
             this.next = in.readNamedWriteable(QueryBuilder.class);
             this.field = in.readString();
-            if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
-                this.source = readSource(in);
+            if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_SINGLE_VALUE_QUERY_SOURCE)) {
+                this.source = Source.readFrom((PlanStreamInput) in);
+            } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
+                this.source = readOldSource(in);
             } else {
                 this.source = Source.EMPTY;
-
             }
         }
 
@@ -119,8 +119,10 @@ public class SingleValueQuery extends Query {
         protected void doWriteTo(StreamOutput out) throws IOException {
             out.writeNamedWriteable(next);
             out.writeString(field);
-            if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
-                writeSource(out, source);
+            if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_SINGLE_VALUE_QUERY_SOURCE)) {
+                source.writeTo(out);
+            } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
+                writeOldSource(out, source);
             }
         }
 
@@ -199,4 +201,24 @@ public class SingleValueQuery extends Query {
         }
     }
 
+    /**
+     * Write a {@link Source} including the text in it.
+     */
+    static void writeOldSource(StreamOutput out, Source source) throws IOException {
+        out.writeInt(source.source().getLineNumber());
+        out.writeInt(source.source().getColumnNumber());
+        out.writeString(source.text());
+    }
+
+    /**
+     * Read a {@link Source} including the text in it.
+     */
+    static Source readOldSource(StreamInput in) throws IOException {
+        int line = in.readInt();
+        int column = in.readInt();
+        int charPositionInLine = column - 1;
+
+        String text = in.readString();
+        return new Source(new Location(line, charPositionInLine), text);
+    }
 }

+ 19 - 15
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

@@ -237,11 +237,12 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
 
     // optimized doesn't know yet how to push down count over field
     public void testCountOneFieldWithFilter() {
-        var plan = plannerOptimizer.plan("""
+        String query = """
             from test
             | where salary > 1000
             | stats c = count(salary)
-            """, IS_SV_STATS);
+            """;
+        var plan = plannerOptimizer.plan(query, IS_SV_STATS);
 
         var limit = as(plan, LimitExec.class);
         var agg = as(limit.child(), AggregateExec.class);
@@ -255,7 +256,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         Source source = new Source(2, 8, "salary > 1000");
         var exists = QueryBuilders.existsQuery("salary");
         assertThat(stat.query(), is(exists));
-        var range = wrapWithSingleQuery(QueryBuilders.rangeQuery("salary").gt(1000), "salary", source);
+        var range = wrapWithSingleQuery(query, QueryBuilders.rangeQuery("salary").gt(1000), "salary", source);
         var expected = QueryBuilders.boolQuery().must(range).must(exists);
         assertThat(expected.toString(), is(esStatsQuery.query().toString()));
     }
@@ -346,11 +347,12 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
     }
 
     public void testAnotherCountAllWithFilter() {
-        var plan = plannerOptimizer.plan("""
+        String query = """
             from test
             | where emp_no > 10010
             | stats c = count()
-            """, IS_SV_STATS);
+            """;
+        var plan = plannerOptimizer.plan(query, IS_SV_STATS);
 
         var limit = as(plan, LimitExec.class);
         var agg = as(limit.child(), AggregateExec.class);
@@ -361,7 +363,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         assertThat(esStatsQuery.limit(), is(nullValue()));
         assertThat(Expressions.names(esStatsQuery.output()), contains("count", "seen"));
         var source = ((SingleValueQuery.Builder) esStatsQuery.query()).source();
-        var expected = wrapWithSingleQuery(QueryBuilders.rangeQuery("emp_no").gt(10010), "emp_no", source);
+        var expected = wrapWithSingleQuery(query, QueryBuilders.rangeQuery("emp_no").gt(10010), "emp_no", source);
         assertThat(expected.toString(), is(esStatsQuery.query().toString()));
     }
 
@@ -415,11 +417,12 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
      */
     public void testMatchCommandWithWhereClause() {
         assumeTrue("skipping because MATCH_COMMAND is not enabled", EsqlCapabilities.Cap.MATCH_COMMAND.isEnabled());
-        var plan = plannerOptimizer.plan("""
+        String queryText = """
             from test
             | where emp_no > 10010
             | match "last_name: Smith"
-            """, IS_SV_STATS);
+            """;
+        var plan = plannerOptimizer.plan(queryText, IS_SV_STATS);
 
         var limit = as(plan, LimitExec.class);
         var exchange = as(limit.child(), ExchangeExec.class);
@@ -429,7 +432,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         assertThat(query.limit().fold(), is(1000));
 
         Source source = new Source(2, 8, "emp_no > 10010");
-        var range = wrapWithSingleQuery(QueryBuilders.rangeQuery("emp_no").gt(10010), "emp_no", source);
+        var range = wrapWithSingleQuery(queryText, QueryBuilders.rangeQuery("emp_no").gt(10010), "emp_no", source);
         var queryString = QueryBuilders.queryStringQuery("last_name: Smith");
         var expected = QueryBuilders.boolQuery().must(range).must(queryString);
         assertThat(query.query().toString(), is(expected.toString()));
@@ -701,7 +704,7 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         assertThat(queryExec.limit().fold(), is(1000));
 
         var expectedInnerQuery = QueryBuilders.termsQuery(fieldName, cidrBlocks);
-        var expectedQuery = wrapWithSingleQuery(expectedInnerQuery, fieldName, new Source(1, 18, cidrMatch));
+        var expectedQuery = wrapWithSingleQuery(query, expectedInnerQuery, fieldName, new Source(1, 18, cidrMatch));
         assertThat(queryExec.query().toString(), is(expectedQuery.toString()));
     }
 
@@ -891,14 +894,15 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
     public void testMultipleMatchFilterPushdown() {
         assumeTrue("Match operator is available just for snapshots", Build.current().isSnapshot());
 
-        var plan = plannerOptimizer.plan("""
+        String query = """
             from test
             | where first_name match "Anna" OR first_name match "Anneke"
             | sort emp_no
             | where emp_no > 10000
             | eval description = concat("emp_no: ", to_str(emp_no), ", name: ", first_name, " ", last_name)
             | where last_name match "Xinglin"
-            """);
+            """;
+        var plan = plannerOptimizer.plan(query);
 
         var eval = as(plan, EvalExec.class);
         var topNExec = as(eval.child(), TopNExec.class);
@@ -911,13 +915,13 @@ public class LocalPhysicalPlanOptimizerTests extends MapperServiceTestCase {
         var expectedLuceneQuery = new BoolQueryBuilder().must(
             new BoolQueryBuilder().should(new MatchQueryBuilder("first_name", "Anna")).should(new MatchQueryBuilder("first_name", "Anneke"))
         )
-            .must(wrapWithSingleQuery(QueryBuilders.rangeQuery("emp_no").gt(10000), "emp_no", filterSource))
+            .must(wrapWithSingleQuery(query, QueryBuilders.rangeQuery("emp_no").gt(10000), "emp_no", filterSource))
             .must(new MatchQueryBuilder("last_name", "Xinglin"));
         assertThat(actualLuceneQuery.toString(), is(expectedLuceneQuery.toString()));
     }
 
-    private QueryBuilder wrapWithSingleQuery(QueryBuilder inner, String fieldName, Source source) {
-        return FilterTests.singleValueQuery(inner, fieldName, source);
+    private QueryBuilder wrapWithSingleQuery(String query, QueryBuilder inner, String fieldName, Source source) {
+        return FilterTests.singleValueQuery(query, inner, fieldName, source);
     }
 
     private Stat queryStatsFor(PhysicalPlan plan) {

+ 40 - 26
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

@@ -27,6 +27,9 @@ import org.elasticsearch.xpack.esql.core.util.Queries;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
+import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
 import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
 import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
@@ -35,6 +38,7 @@ import org.elasticsearch.xpack.esql.parser.EsqlParser;
 import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
+import org.elasticsearch.xpack.esql.session.Configuration;
 import org.junit.BeforeClass;
 
 import java.io.IOException;
@@ -45,6 +49,7 @@ import java.util.Set;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
+import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
@@ -52,7 +57,6 @@ import static org.elasticsearch.xpack.esql.SerializationTestUtils.assertSerializ
 import static org.elasticsearch.xpack.esql.core.util.Queries.Clause.FILTER;
 import static org.elasticsearch.xpack.esql.core.util.Queries.Clause.MUST;
 import static org.elasticsearch.xpack.esql.core.util.Queries.Clause.SHOULD;
-import static org.elasticsearch.xpack.esql.core.util.SourceUtils.writeSource;
 import static org.hamcrest.Matchers.nullValue;
 
 public class FilterTests extends ESTestCase {
@@ -65,14 +69,13 @@ public class FilterTests extends ESTestCase {
     private static Analyzer analyzer;
     private static LogicalPlanOptimizer logicalOptimizer;
     private static PhysicalPlanOptimizer physicalPlanOptimizer;
-    private static Map<String, EsField> mapping;
     private static Mapper mapper;
 
     @BeforeClass
     public static void init() {
         parser = new EsqlParser();
 
-        mapping = loadMapping("mapping-basic.json");
+        Map<String, EsField> mapping = loadMapping("mapping-basic.json");
         EsIndex test = new EsIndex("test", mapping, Set.of("test"));
         IndexResolution getIndexResult = IndexResolution.valid(test);
         logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG));
@@ -100,13 +103,14 @@ public class FilterTests extends ESTestCase {
     public void testTimestampNoRequestFilterQueryFilter() {
         var value = 10;
 
-        var plan = plan(LoggerMessageFormat.format(null, """
+        String query = LoggerMessageFormat.format(null, """
              FROM test
             |WHERE {} > {}
-            """, EMP_NO, value), null);
+            """, EMP_NO, value);
+        var plan = plan(query, null);
 
         var filter = filterQueryForTransportNodes(plan);
-        var expected = singleValueQuery(rangeQuery(EMP_NO).gt(value), EMP_NO, ((SingleValueQuery.Builder) filter).source());
+        var expected = singleValueQuery(query, rangeQuery(EMP_NO).gt(value), EMP_NO, ((SingleValueQuery.Builder) filter).source());
         assertEquals(expected.toString(), filter.toString());
     }
 
@@ -114,14 +118,16 @@ public class FilterTests extends ESTestCase {
         var value = 10;
         var restFilter = restFilterQuery(EMP_NO);
 
-        var plan = plan(LoggerMessageFormat.format(null, """
+        String query = LoggerMessageFormat.format(null, """
              FROM test
             |WHERE {} > 10
-            """, EMP_NO, value), restFilter);
+            """, EMP_NO, value);
+        var plan = plan(query, restFilter);
 
         var filter = filterQueryForTransportNodes(plan);
         var builder = ((BoolQueryBuilder) filter).filter().get(1);
         var queryFilter = singleValueQuery(
+            query,
             rangeQuery(EMP_NO).gt(value).includeUpper(false),
             EMP_NO,
             ((SingleValueQuery.Builder) builder).source()
@@ -135,15 +141,16 @@ public class FilterTests extends ESTestCase {
         var highValue = 100;
         var restFilter = restFilterQuery(EMP_NO);
 
-        var plan = plan(LoggerMessageFormat.format(null, """
+        String query = LoggerMessageFormat.format(null, """
              FROM test
             |WHERE {} > {} AND {} < {}
-            """, EMP_NO, lowValue, EMP_NO, highValue), restFilter);
+            """, EMP_NO, lowValue, EMP_NO, highValue);
+        var plan = plan(query, restFilter);
 
         var filter = filterQueryForTransportNodes(plan);
         var musts = ((BoolQueryBuilder) ((BoolQueryBuilder) filter).filter().get(1)).must();
-        var left = singleValueQuery(rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(0)).source());
-        var right = singleValueQuery(rangeQuery(EMP_NO).lt(highValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(1)).source());
+        var left = singleValueQuery(query, rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(0)).source());
+        var right = singleValueQuery(query, rangeQuery(EMP_NO).lt(highValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(1)).source());
         var must = Queries.combine(MUST, asList(left, right));
         var expected = Queries.combine(FILTER, asList(restFilter, must));
         assertEquals(expected.toString(), filter.toString());
@@ -169,15 +176,16 @@ public class FilterTests extends ESTestCase {
         var highValue = 100;
         var restFilter = restFilterQuery(EMP_NO);
 
-        var plan = plan(LoggerMessageFormat.format(null, """
+        String query = LoggerMessageFormat.format(null, """
              FROM test
             |WHERE {} > {} OR {} < {}
-            """, EMP_NO, lowValue, EMP_NO, highValue), restFilter);
+            """, EMP_NO, lowValue, EMP_NO, highValue);
+        var plan = plan(query, restFilter);
 
         var filter = filterQueryForTransportNodes(plan);
         var shoulds = ((BoolQueryBuilder) ((BoolQueryBuilder) filter).filter().get(1)).should();
-        var left = singleValueQuery(rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) shoulds.get(0)).source());
-        var right = singleValueQuery(rangeQuery(EMP_NO).lt(highValue), EMP_NO, ((SingleValueQuery.Builder) shoulds.get(1)).source());
+        var left = singleValueQuery(query, rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) shoulds.get(0)).source());
+        var right = singleValueQuery(query, rangeQuery(EMP_NO).lt(highValue), EMP_NO, ((SingleValueQuery.Builder) shoulds.get(1)).source());
         var should = Queries.combine(SHOULD, asList(left, right));
         var expected = Queries.combine(FILTER, asList(restFilter, should));
         assertEquals(expected.toString(), filter.toString());
@@ -189,15 +197,16 @@ public class FilterTests extends ESTestCase {
         var eqValue = 1234;
         var restFilter = restFilterQuery(EMP_NO);
 
-        var plan = plan(LoggerMessageFormat.format(null, """
+        String query = LoggerMessageFormat.format(null, """
              FROM test
             |WHERE {} > {} AND {} == {} AND {} < {}
-            """, EMP_NO, lowValue, OTHER_FIELD, eqValue, EMP_NO, highValue), restFilter);
+            """, EMP_NO, lowValue, OTHER_FIELD, eqValue, EMP_NO, highValue);
+        var plan = plan(query, restFilter);
 
         var filter = filterQueryForTransportNodes(plan);
         var musts = ((BoolQueryBuilder) ((BoolQueryBuilder) filter).filter().get(1)).must();
-        var left = singleValueQuery(rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(0)).source());
-        var right = singleValueQuery(rangeQuery(EMP_NO).lt(highValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(1)).source());
+        var left = singleValueQuery(query, rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(0)).source());
+        var right = singleValueQuery(query, rangeQuery(EMP_NO).lt(highValue), EMP_NO, ((SingleValueQuery.Builder) musts.get(1)).source());
         var must = Queries.combine(MUST, asList(left, right));
         var expected = Queries.combine(FILTER, asList(restFilter, must));
         assertEquals(expected.toString(), filter.toString());
@@ -210,16 +219,17 @@ public class FilterTests extends ESTestCase {
 
         var restFilter = restFilterQuery(EMP_NO);
 
-        var plan = plan(LoggerMessageFormat.format(null, """
+        String query = LoggerMessageFormat.format(null, """
              FROM test
             |WHERE {} > {}
             |EVAL {} = {}
             |WHERE {} > {}
-            """, EMP_NO, lowValue, EMP_NO, eqValue, EMP_NO, highValue), restFilter);
+            """, EMP_NO, lowValue, EMP_NO, eqValue, EMP_NO, highValue);
+        var plan = plan(query, restFilter);
 
         var filter = filterQueryForTransportNodes(plan);
         var builder = ((BoolQueryBuilder) filter).filter().get(1);
-        var queryFilter = singleValueQuery(rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) builder).source());
+        var queryFilter = singleValueQuery(query, rangeQuery(EMP_NO).gt(lowValue), EMP_NO, ((SingleValueQuery.Builder) builder).source());
         var expected = Queries.combine(FILTER, asList(restFilter, queryFilter));
         assertEquals(expected.toString(), filter.toString());
     }
@@ -265,21 +275,25 @@ public class FilterTests extends ESTestCase {
      * Ugly hack to create a QueryBuilder for SingleValueQuery.
      * For some reason however the queryName is set to null on range queries when deserializing.
      */
-    public static QueryBuilder singleValueQuery(QueryBuilder inner, String field, Source source) {
+    public static QueryBuilder singleValueQuery(String query, QueryBuilder inner, String field, Source source) {
         try (BytesStreamOutput out = new BytesStreamOutput()) {
+            Configuration config = randomConfiguration(query, Map.of());
+
             // emulate SingleValueQuery writeTo
             out.writeFloat(AbstractQueryBuilder.DEFAULT_BOOST);
             out.writeOptionalString(null);
             out.writeNamedWriteable(inner);
             out.writeString(field);
-            writeSource(out, source);
+            source.writeTo(new PlanStreamOutput(out, new PlanNameRegistry(), config));
 
             StreamInput in = new NamedWriteableAwareStreamInput(
                 ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())),
                 SerializationTestUtils.writableRegistry()
             );
 
-            Object obj = SingleValueQuery.ENTRY.reader.read(in);
+            Object obj = SingleValueQuery.ENTRY.reader.read(
+                new PlanStreamInput(in, new PlanNameRegistry(), in.namedWriteableRegistry(), config)
+            );
             return (QueryBuilder) obj;
         } catch (IOException e) {
             throw new UncheckedIOException(e);

+ 37 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/querydsl/query/SingleValueQuerySerializationTests.java

@@ -7,16 +7,30 @@
 
 package org.elasticsearch.xpack.esql.querydsl.query;
 
+import org.elasticsearch.TransportVersion;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.TermQueryBuilder;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.test.AbstractWireTestCase;
 import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
+import org.elasticsearch.xpack.esql.session.Configuration;
+import org.junit.Before;
 
+import java.io.IOException;
 import java.util.List;
 
-public class SingleValueQuerySerializationTests extends AbstractWireSerializingTestCase<SingleValueQuery.Builder> {
+import static org.elasticsearch.xpack.esql.ConfigurationTestUtils.randomConfiguration;
+
+public class SingleValueQuerySerializationTests extends AbstractWireTestCase<SingleValueQuery.Builder> {
+    /**
+     * We use a single random config for all serialization because it's pretty
+     * heavy to build, especially in {@link #testConcurrentSerialization()}.
+     */
+    private Configuration config;
+
     @Override
     protected SingleValueQuery.Builder createTestInstance() {
         return new SingleValueQuery.Builder(randomQuery(), randomFieldName(), Source.EMPTY);
@@ -48,14 +62,31 @@ public class SingleValueQuerySerializationTests extends AbstractWireSerializingT
     }
 
     @Override
-    protected Writeable.Reader<SingleValueQuery.Builder> instanceReader() {
-        return SingleValueQuery.Builder::new;
+    protected final SingleValueQuery.Builder copyInstance(SingleValueQuery.Builder instance, TransportVersion version) throws IOException {
+        return copyInstance(
+            instance,
+            getNamedWriteableRegistry(),
+            (out, v) -> new PlanStreamOutput(out, new PlanNameRegistry(), config).writeNamedWriteable(v),
+            in -> {
+                PlanStreamInput pin = new PlanStreamInput(in, new PlanNameRegistry(), in.namedWriteableRegistry(), config);
+                return (SingleValueQuery.Builder) pin.readNamedWriteable(QueryBuilder.class);
+            },
+            version
+        );
     }
 
     @Override
     protected NamedWriteableRegistry getNamedWriteableRegistry() {
         return new NamedWriteableRegistry(
-            List.of(new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new))
+            List.of(
+                SingleValueQuery.ENTRY,
+                new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new)
+            )
         );
     }
+
+    @Before
+    public void initConfig() {
+        this.config = randomConfiguration();
+    }
 }