浏览代码

ESQL: Serialize the source in expressions (#99956)

When de-/serializing expressions, the source is currently omitted. This
is however required to correctly issue Warning headers. This adds
serialisation of the source.

The entire query is now added to the EsqlConfiguration and serialized
entirely and just once, part of it. This allows each expression to only
serialise source's length, since this, together with the Location,
allows reconstructing the source from config. The query string is also 
compressed, if exceeding a certain size (5K chars).

The Warning messages are also modified to always include the location of
the query whose evaluation generated an exception. This is required
because same exception can be generated from different query statements
and the Warning caching code deduplicates the headers.
Bogdan Pintea 2 年之前
父节点
当前提交
869aa86ca9
共有 42 个文件被更改,包括 532 次插入212 次删除
  1. 5 0
      docs/changelog/99956.yaml
  2. 2 2
      x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml
  3. 16 2
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
  4. 7 7
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec
  5. 3 3
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec
  6. 47 10
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec
  7. 1 1
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec
  8. 24 24
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/math.csv-spec
  9. 4 4
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/row.csv-spec
  10. 14 10
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/Warnings.java
  11. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/DoubleConstantFunction.java
  12. 155 80
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  13. 47 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
  14. 12 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java
  15. 2 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  16. 45 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java
  17. 2 6
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java
  18. 6 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java
  19. 10 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java
  20. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java
  21. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/WarningsTests.java
  22. 3 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIPTests.java
  23. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AcosTests.java
  24. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AsinTests.java
  25. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/CoshTests.java
  26. 4 4
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/Log10Tests.java
  27. 5 5
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/PowTests.java
  28. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/SinhTests.java
  29. 3 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/SqrtTests.java
  30. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/AbstractBinaryOperatorTestCase.java
  31. 2 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/NegTests.java
  32. 59 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInputTests.java
  33. 2 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java
  34. 2 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java
  35. 2 6
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java
  36. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/ExpressionTests.java
  37. 11 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java
  38. 3 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java
  39. 2 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java
  40. 3 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java
  41. 3 4
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java
  42. 11 3
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java

+ 5 - 0
docs/changelog/99956.yaml

@@ -0,0 +1,5 @@
+pr: 99956
+summary: "ESQL: Serialize the source in expressions"
+area: ES|QL
+type: enhancement
+issues: []

+ 2 - 2
x-pack/plugin/esql/qa/server/single-node/src/yamlRestTest/resources/rest-api-spec/test/100_bug_fix.yml

@@ -17,7 +17,7 @@ setup:
   - do:
       warnings:
         - "Line 1:37: evaluation of [to_ip(coalesce(ip1.keyword, \"255.255.255.255\"))] failed, treating result as null. Only first 20 failures recorded."
-        - "java.lang.IllegalArgumentException: '127.0' is not an IP string literal."
+        - "Line 1:37: java.lang.IllegalArgumentException: '127.0' is not an IP string literal."
       esql.query:
         body:
           query: 'FROM test | sort emp_no | eval ip = to_ip(coalesce(ip1.keyword, "255.255.255.255")) | keep emp_no, ip'
@@ -35,7 +35,7 @@ setup:
   - do:
       warnings:
         - "Line 1:98: evaluation of [to_ip(x2)] failed, treating result as null. Only first 20 failures recorded."
-        - "java.lang.IllegalArgumentException: '127.00.1' is not an IP string literal."
+        - "Line 1:98: java.lang.IllegalArgumentException: '127.00.1' is not an IP string literal."
       esql.query:
         body:
           query: 'FROM test | sort emp_no | eval x1 = concat(ip1, ip2), x2 = coalesce(x1, "255.255.255.255"), x3 = to_ip(x2) | keep emp_no, x*'

+ 16 - 2
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

@@ -12,11 +12,13 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.compute.data.BlockUtils;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
+import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
 import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
+import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypeRegistry;
 import org.elasticsearch.xpack.ql.expression.Attribute;
@@ -27,6 +29,7 @@ import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.DateUtils;
 import org.elasticsearch.xpack.ql.type.EsField;
 import org.elasticsearch.xpack.ql.type.TypesTests;
+import org.elasticsearch.xpack.ql.util.StringUtils;
 import org.junit.Assert;
 
 import java.util.ArrayList;
@@ -83,9 +86,11 @@ public final class EsqlTestUtils {
 
     public static final EsqlConfiguration TEST_CFG = configuration(new QueryPragmas(Settings.EMPTY));
 
+    public static final Verifier TEST_VERIFIER = new Verifier(new Metrics());
+
     private EsqlTestUtils() {}
 
-    public static EsqlConfiguration configuration(QueryPragmas pragmas) {
+    public static EsqlConfiguration configuration(QueryPragmas pragmas, String query) {
         return new EsqlConfiguration(
             DateUtils.UTC,
             Locale.US,
@@ -93,10 +98,19 @@ public final class EsqlTestUtils {
             null,
             pragmas,
             EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(Settings.EMPTY),
-            EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.getDefault(Settings.EMPTY)
+            EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.getDefault(Settings.EMPTY),
+            query
         );
     }
 
+    public static EsqlConfiguration configuration(QueryPragmas pragmas) {
+        return configuration(pragmas, StringUtils.EMPTY);
+    }
+
+    public static EsqlConfiguration configuration(String query) {
+        return configuration(new QueryPragmas(Settings.EMPTY), query);
+    }
+
     public static Literal L(Object value) {
         return of(value);
     }

+ 7 - 7
x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec

@@ -174,7 +174,7 @@ ROW string = ["1953-09-02T00:00:00.000Z", "1964-06-02T00:00:00.000Z", "1964-06-0
 // end::to_datetime-str[]
 ;
 warning:Line 2:19: evaluation of [TO_DATETIME(string)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.IllegalArgumentException: failed to parse date field [1964-06-02 00:00:00] with format [yyyy-MM-dd'T'HH:mm:ss.SSS'Z']
+warning:Line 2:19: java.lang.IllegalArgumentException: failed to parse date field [1964-06-02 00:00:00] with format [yyyy-MM-dd'T'HH:mm:ss.SSS'Z']
 
 // tag::to_datetime-str-result[]
 string:keyword                                          |datetime:date
@@ -185,7 +185,7 @@ string:keyword                                          |datetime:date
 convertFromUnsignedLong
 row ul = [9223372036854775808, 520128000000] | eval dt = to_datetime(ul);
 warning:Line 1:58: evaluation of [to_datetime(ul)] failed, treating result as null. Only first 20 failures recorded.
-warning:org.elasticsearch.xpack.ql.QlIllegalArgumentException: [9223372036854775808] out of [long] range
+warning:Line 1:58: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [9223372036854775808] out of [long] range
 
                 ul:ul              |           dt:date
 [9223372036854775808, 520128000000]|1986-06-26T00:00:00.000Z
@@ -361,7 +361,7 @@ b:datetime
 evalDateParseWrongDate
 row a = "2023-02-01 foo" | eval b = date_parse("yyyy-MM-dd", a) | keep b;
 warning:Line 1:37: evaluation of [date_parse(\"yyyy-MM-dd\", a)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.IllegalArgumentException: failed to parse date field [2023-02-01 foo] with format [yyyy-MM-dd]
+warning:Line 1:37: java.lang.IllegalArgumentException: failed to parse date field [2023-02-01 foo] with format [yyyy-MM-dd]
 
 b:datetime
 null
@@ -370,7 +370,7 @@ null
 evalDateParseNotMatching
 row a = "2023-02-01" | eval b = date_parse("yyyy-MM", a) | keep b;
 warning:Line 1:33: evaluation of [date_parse(\"yyyy-MM\", a)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.IllegalArgumentException: failed to parse date field [2023-02-01] with format [yyyy-MM]
+warning:Line 1:33: java.lang.IllegalArgumentException: failed to parse date field [2023-02-01] with format [yyyy-MM]
 b:datetime
 null
 ;
@@ -378,7 +378,7 @@ null
 evalDateParseNotMatching2
 row a = "2023-02-01" | eval b = date_parse("yyyy-MM-dd HH:mm:ss", a) | keep b;
 warning:Line 1:33: evaluation of [date_parse(\"yyyy-MM-dd HH:mm:ss\", a)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.IllegalArgumentException: failed to parse date field [2023-02-01] with format [yyyy-MM-dd HH:mm:ss]
+warning:Line 1:33: java.lang.IllegalArgumentException: failed to parse date field [2023-02-01] with format [yyyy-MM-dd HH:mm:ss]
 
 b:datetime
 null
@@ -638,7 +638,7 @@ row dt = to_dt(9223372036854775807)
 | keep plus;
 
 warning:Line 2:15: evaluation of [dt + 1 day] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 2:15: java.lang.ArithmeticException: long overflow
 
 plus:datetime
 null
@@ -650,7 +650,7 @@ row dt = to_dt(0)
 | keep plus;
 
 warning:Line 2:15: evaluation of [dt + 2147483647 years] failed, treating result as null. Only first 20 failures recorded.
-warning:java.time.DateTimeException: Invalid value for Year (valid values -999999999 - 999999999): 2147485617
+warning:Line 2:15: java.time.DateTimeException: Invalid value for Year (valid values -999999999 - 999999999): 2147485617
 
 plus:datetime
 null

+ 3 - 3
x-pack/plugin/esql/qa/testFixtures/src/main/resources/floats.csv-spec

@@ -62,7 +62,7 @@ ROW str1 = "5.20128E11", str2 = "foo"
 // end::to_double-str[]
 ;
 warning:Line 2:72: evaluation of [TO_DOUBLE(str2)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: For input string: \"foo\"
+warning:Line 2:72: java.lang.NumberFormatException: For input string: \"foo\"
 
 // tag::to_double-str-result[]
 str1:keyword |str2:keyword |dbl:double |dbl1:double |dbl2:double
@@ -274,7 +274,7 @@ ROW a=12.0
 | EVAL acos=ACOS(a)
 ;
 warning:Line 2:13: evaluation of [ACOS(a)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: Acos input out of range
+warning:Line 2:13: java.lang.ArithmeticException: Acos input out of range
 
 a:double | acos:double
       12 | null
@@ -324,7 +324,7 @@ ROW a=12.0
 | EVAL asin=ASIN(a)
 ;
 warning:Line 2:13: evaluation of [ASIN(a)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: Asin input out of range
+warning:Line 2:13: java.lang.ArithmeticException: Asin input out of range
 
 a:double | asin:double
       12 | null

+ 47 - 10
x-pack/plugin/esql/qa/testFixtures/src/main/resources/ints.csv-spec

@@ -66,7 +66,7 @@ long:long                    |ul:ul
 convertDoubleToUL
 row d = 123.4 | eval ul = to_ul(d), overflow = to_ul(1e20);
 warning:Line 1:48: evaluation of [to_ul(1e20)] failed, treating result as null. Only first 20 failures recorded.
-warning:org.elasticsearch.xpack.ql.QlIllegalArgumentException: [1.0E20] out of [unsigned_long] range
+warning:Line 1:48: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [1.0E20] out of [unsigned_long] range
 
 d:double       |ul:ul      |overflow:ul
 123.4          |123        |null
@@ -95,7 +95,7 @@ ROW str1 = "2147483648", str2 = "2147483648.2", str3 = "foo"
 // end::to_unsigned_long-str[]
 ;
 warning:Line 2:72: evaluation of [TO_UL(str3)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: Character f is neither a decimal digit number, decimal point, nor \"e\" notation exponential mark.
+warning:Line 2:72: java.lang.NumberFormatException: Character f is neither a decimal digit number, decimal point, nor \"e\" notation exponential mark.
 
 
 // tag::to_unsigned_long-str-result[]
@@ -123,7 +123,7 @@ int:integer       |long:long
 convertULToLong
 row ul = [9223372036854775807, 9223372036854775808] | eval long = to_long(ul);
 warning:Line 1:67: evaluation of [to_long(ul)] failed, treating result as null. Only first 20 failures recorded.
-warning:org.elasticsearch.xpack.ql.QlIllegalArgumentException: [9223372036854775808] out of [long] range
+warning:Line 1:67: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [9223372036854775808] out of [long] range
 
                     ul:ul                 |       long:long
 [9223372036854775807, 9223372036854775808]|9223372036854775807
@@ -152,7 +152,7 @@ ROW str1 = "2147483648", str2 = "2147483648.2", str3 = "foo"
 // end::to_long-str[]
 ;
 warning:Line 2:62: evaluation of [TO_LONG(str3)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: For input string: \"foo\"
+warning:Line 2:62: java.lang.NumberFormatException: For input string: \"foo\"
 
 
 // tag::to_long-str-result[]
@@ -164,7 +164,7 @@ str1:keyword |str2:keyword |str3:keyword |long1:long  |long2:long |long3:long
 convertDoubleToLong
 row d = 123.4 | eval d2l = to_long(d), overflow = to_long(1e19);
 warning:Line 1:51: evaluation of [to_long(1e19)] failed, treating result as null. Only first 20 failures recorded.
-warning:org.elasticsearch.xpack.ql.QlIllegalArgumentException: [1.0E19] out of [long] range
+warning:Line 1:51: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [1.0E19] out of [long] range
 
 d:double       |d2l:long      |overflow:long     
 123.4          |123           |null    
@@ -186,7 +186,7 @@ ROW long = [5013792, 2147483647, 501379200000]
 // end::to_int-long[]
 ;
 warning:Line 2:14: evaluation of [TO_INTEGER(long)] failed, treating result as null. Only first 20 failures recorded.
-warning:org.elasticsearch.xpack.ql.QlIllegalArgumentException: [501379200000] out of [integer] range
+warning:Line 2:14: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [501379200000] out of [integer] range
 
 // tag::to_int-long-result[]
 long:long                           |int:integer
@@ -198,7 +198,7 @@ convertULToInt
 row ul = [2147483647, 9223372036854775808] | eval int = to_int(ul);
 warning:Line 1:57: evaluation of [to_int(ul)] failed, treating result as null. Only first 20 failures recorded.
 // UL conversion to int dips into long; not the most efficient, but it's how SQL does it too.
-warning:org.elasticsearch.xpack.ql.QlIllegalArgumentException: [9223372036854775808] out of [long] range
+warning:Line 1:57: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [9223372036854775808] out of [long] range
 
                     ul:ul        |int:integer
 [2147483647, 9223372036854775808]|2147483647
@@ -221,9 +221,9 @@ tf:boolean     |t2i:integer    |f2i:integer    |tf2i:integer
 convertStringToInt
 row int_str = "2147483647", int_dbl_str = "2147483647.2" | eval is2i = to_integer(int_str), ids2i = to_integer(int_dbl_str), overflow = to_integer("2147483648"), no_number = to_integer("foo");
 warning:Line 1:137: evaluation of [to_integer(\"2147483648\")] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: For input string: \"2147483648\"
+warning:Line 1:137: java.lang.NumberFormatException: For input string: \"2147483648\"
 warning:Line 1:175: evaluation of [to_integer(\"foo\")] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: For input string: \"foo\"
+warning:Line 1:175: java.lang.NumberFormatException: For input string: \"foo\"
 
 int_str:keyword  |int_dbl_str:keyword |is2i:integer|ids2i:integer   |overflow:integer |no_number:integer
 2147483647       |2147483647.2        |2147483647  |2147483647      |null             |null
@@ -232,7 +232,7 @@ int_str:keyword  |int_dbl_str:keyword |is2i:integer|ids2i:integer   |overflow:in
 convertDoubleToInt
 row d = 123.4 | eval d2i = to_integer(d), overflow = to_integer(1e19);
 warning:Line 1:54: evaluation of [to_integer(1e19)] failed, treating result as null. Only first 20 failures recorded.
-warning:org.elasticsearch.xpack.ql.QlIllegalArgumentException: [1.0E19] out of [long] range
+warning:Line 1:54: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [1.0E19] out of [long] range
 
 d:double       |d2i:integer   |overflow:integer
 123.4          |123           |null    
@@ -472,3 +472,40 @@ ROW deg = [90, 180, 270]
    deg:integer | rad:double
 [90, 180, 270] | [1.5707963267948966, 3.141592653589793, 4.71238898038469]
 ;
+
+warningWithFromSource
+from employees | eval x = to_long(emp_no) * 10000000 | eval y = to_int(x) > 1 | keep y | limit 1;
+warning:Line 1:65: evaluation of [to_int(x)] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 1:65: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100010000000] out of [integer] range
+
+y:boolean
+null
+;
+
+// the test is also notable through having the "failing" operation in the filter, which will be part of the fragment sent to a data node
+multipleWarnings
+from employees | sort emp_no | eval x = to_long(emp_no) * 10000000 | where to_int(x) > 1 | keep x | limit 1;
+warning:Line 1:76: evaluation of [to_int(x)] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100010000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100020000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100030000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100040000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100050000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100060000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100070000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100080000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100090000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100100000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100110000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100120000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100130000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100140000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100150000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100160000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100170000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100180000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100190000000] out of [integer] range
+warning:Line 1:76: org.elasticsearch.xpack.ql.QlIllegalArgumentException: [100200000000] out of [integer] range
+
+x:long
+;

+ 1 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec

@@ -204,7 +204,7 @@ ROW str1 = "1.1.1.1", str2 = "foo"
 // end::to_ip[]
 ;
 warning:Line 2:33: evaluation of [TO_IP(str2)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.IllegalArgumentException: 'foo' is not an IP string literal.
+warning:Line 2:33: java.lang.IllegalArgumentException: 'foo' is not an IP string literal.
 
 // tag::to_ip-result[]
 str1:keyword |str2:keyword |ip1:ip  |ip2:ip

+ 24 - 24
x-pack/plugin/esql/qa/testFixtures/src/main/resources/math.csv-spec

@@ -18,7 +18,7 @@ addLongOverflow
 row max = 9223372036854775807 | eval sum = max + 1 | keep sum;
 
 warning:Line 1:44: evaluation of [max + 1] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 1:44: java.lang.ArithmeticException: long overflow
 
 sum:long
 null
@@ -28,7 +28,7 @@ subLongUnderflow
 row l = -9223372036854775807 | eval sub = l - 2 | keep sub;
 
 warning:Line 1:43: evaluation of [l - 2] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 1:43: java.lang.ArithmeticException: long overflow
 
 sub:long
 null
@@ -38,7 +38,7 @@ mulLongOverflow
 row max = 9223372036854775807 | eval mul = max * 2 | keep mul;
 
 warning:Line 1:44: evaluation of [max * 2] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 1:44: java.lang.ArithmeticException: long overflow
 
 mul:long
 null
@@ -48,7 +48,7 @@ divLongByZero
 row max = 9223372036854775807 | eval div = max / 0 | keep div;
 
 warning:Line 1:44: evaluation of [max / 0] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: / by zero
+warning:Line 1:44: java.lang.ArithmeticException: / by zero
 
 div:long
 null
@@ -59,7 +59,7 @@ row max = 9223372036854775807 | eval mod = max % 0 | keep mod;
 
 // ascii(%) == %25
 warning:Line 1:44: evaluation of [max %25 0] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: / by zero
+warning:Line 1:44: java.lang.ArithmeticException: / by zero
 
 mod:long
 null
@@ -140,7 +140,7 @@ negateIntOverflow
 // Negating Integer.MIN_VALUE overflows.
 row x=-2147483648 | eval a = -x;
 warning:Line 1:30: evaluation of [-x] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: integer overflow
+warning:Line 1:30: java.lang.ArithmeticException: integer overflow
 
 x:integer   | a:integer
 -2147483648 | null
@@ -150,7 +150,7 @@ negateLongOverflow
 // Negating Long.MIN_VALUE overflows.
 row x=-9223372036854775808 | eval a = -x;
 warning:Line 1:39: evaluation of [-x] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 1:39: java.lang.ArithmeticException: long overflow
 
 x:long               | a:long
 -9223372036854775808 | null
@@ -268,7 +268,7 @@ d: double | s:double
 log10ofNegative
 row d = -1.0 | eval s = log10(d);
 warning:Line 1:25: evaluation of [log10(d)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: Log of non-positive number
+warning:Line 1:25: java.lang.ArithmeticException: Log of non-positive number
 
 d:double | s:double
 -1.0     | null
@@ -277,7 +277,7 @@ d:double | s:double
 log10ofZero
 row d = 0.0 | eval s = log10(d);
 warning:Line 1:24: evaluation of [log10(d)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: Log of non-positive number
+warning:Line 1:24: java.lang.ArithmeticException: Log of non-positive number
 
 d:double | s:double
 0.0     | null
@@ -286,7 +286,7 @@ d:double | s:double
 log10ofNegativeZero
 row d = -0.0 | eval s = log10(d);
 warning:Line 1:25: evaluation of [log10(d)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: Log of non-positive number
+warning:Line 1:25: java.lang.ArithmeticException: Log of non-positive number
 
 d:double | s:double
 -0.0     | null
@@ -335,7 +335,7 @@ ROW base = -4, exponent = 0.5
 // end::powNeg-sqrt[]
 ;
 warning:Line 2:12: evaluation of [POW(base, exponent)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: invalid result: pow(-4.0, 0.5)
+warning:Line 2:12: java.lang.ArithmeticException: invalid result: pow(-4.0, 0.5)
 
 // tag::powNeg-sqrt-result[]
 base:integer | exponent:double | s:double
@@ -414,7 +414,7 @@ x:double
 powIntULOverrun
 row x = pow(2, 9223372036854775808);
 warning:Line 1:9: evaluation of [pow(2, 9223372036854775808)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 1:9: java.lang.ArithmeticException: long overflow
 
 x:long
 null
@@ -434,7 +434,7 @@ ROW x = POW(9223372036854775808, 2)
 ;
 // tag::powULOverrun-warning[]
 warning:Line 1:9: evaluation of [POW(9223372036854775808, 2)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 1:9: java.lang.ArithmeticException: long overflow
 // end::powULOverrun-warning[]
 
 // tag::powULOverrun-result[]
@@ -465,7 +465,7 @@ x:long
 powULLongOverrun
 row x = to_long(100) | eval x = pow(to_unsigned_long(10), x);
 warning:Line 1:33: evaluation of [pow(to_unsigned_long(10), x)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 1:33: java.lang.ArithmeticException: long overflow
 
 x:long
 null
@@ -699,7 +699,7 @@ ROW ints = [0, 1, 2147483647]
 | KEEP mvsum;
 
 warning:Line 2:16: evaluation of [mv_sum(ints)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: integer overflow
+warning:Line 2:16: java.lang.ArithmeticException: integer overflow
 
 mvsum:integer
 null
@@ -711,7 +711,7 @@ ROW longs = [0, 1, 9223372036854775807]
 | KEEP mvsum;
 
 warning:Line 2:16: evaluation of [mv_sum(longs)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: long overflow
+warning:Line 2:16: java.lang.ArithmeticException: long overflow
 
 mvsum:long
 null
@@ -723,7 +723,7 @@ ROW ulongs = [0, 1, 18446744073709551615]
 | KEEP mvsum;
 
 warning:Line 2:16: evaluation of [mv_sum(ulongs)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: unsigned_long overflow
+warning:Line 2:16: java.lang.ArithmeticException: unsigned_long overflow
 
 mvsum:unsigned_long
 null
@@ -777,7 +777,7 @@ ulAdditionOverflow
 row x = 18446744073709551615, y = to_ul(1) | eval x = x + y | keep x;
 
 warning:Line 1:55: evaluation of [x + y] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: unsigned_long overflow
+warning:Line 1:55: java.lang.ArithmeticException: unsigned_long overflow
 
 x:ul
 null
@@ -808,7 +808,7 @@ ulSubtractionUnderflow
 row x = to_ul(0), y = to_ul(1) | eval x = x - y | keep x;
 
 warning:Line 1:43: evaluation of [x - y] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: unsigned_long overflow
+warning:Line 1:43: java.lang.ArithmeticException: unsigned_long overflow
 
 x:ul
 null
@@ -825,7 +825,7 @@ ulMultiplicationOverflow
 row x = 9223372036854775808, two = to_ul(2) | eval times2 = x * two | keep times2;
 
 warning:Line 1:61: evaluation of [x * two] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: unsigned_long overflow
+warning:Line 1:61: java.lang.ArithmeticException: unsigned_long overflow
 
 times2:ul
 null
@@ -835,7 +835,7 @@ ulMultiplicationOverflow2
 row x = 9223372036854775808, y = 9223372036854775809 | eval x = x * y | keep x;
 
 warning:Line 1:65: evaluation of [x * y] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: unsigned_long overflow
+warning:Line 1:65: java.lang.ArithmeticException: unsigned_long overflow
 
 x:ul
 null
@@ -852,7 +852,7 @@ ulDivisionByZero
 row halfplus = 9223372036854775808, zero = to_ul(0) | eval div = halfplus / zero | keep div;
 
 warning:Line 1:66: evaluation of [halfplus / zero] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: / by zero
+warning:Line 1:66: java.lang.ArithmeticException: / by zero
 
 div:ul
 null
@@ -870,7 +870,7 @@ row halfplus = 9223372036854775808, zero = to_ul(0) | eval mod = halfplus % zero
 
 // ascii(%) == %25
 warning:Line 1:66: evaluation of [halfplus %25 zero] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: / by zero
+warning:Line 1:66: java.lang.ArithmeticException: / by zero
 
 mod:ul
 null
@@ -1021,7 +1021,7 @@ l:ul                  | s:double
 sqrtOfNegative
 row d = -1.0 | eval s = sqrt(d);
 warning:Line 1:25: evaluation of [sqrt(d)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.ArithmeticException: Square root of negative
+warning:Line 1:25: java.lang.ArithmeticException: Square root of negative
 
 d:double | s:double
 -1.0     | null

+ 4 - 4
x-pack/plugin/esql/qa/testFixtures/src/main/resources/row.csv-spec

@@ -292,7 +292,7 @@ a:integer |b:integer |c:integer
 convertMvToMvDifferentCardinality
 row strings = ["1", "2", "three"] | eval ints = to_int(strings);
 warning:Line 1:49: evaluation of [to_int(strings)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: For input string: \"three\"
+warning:Line 1:49: java.lang.NumberFormatException: For input string: \"three\"
 
 strings:keyword |ints:integer
 [1, 2, three]   |[1, 2]
@@ -302,7 +302,7 @@ convertMvToSv
 row strings = ["1", "two"] | eval ints = to_int(strings);
 
 warning:Line 1:42: evaluation of [to_int(strings)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: For input string: \"two\"
+warning:Line 1:42: java.lang.NumberFormatException: For input string: \"two\"
 
 strings:keyword |ints:integer
 [1, two]        |1
@@ -311,8 +311,8 @@ strings:keyword |ints:integer
 convertMvToNull
 row strings = ["one", "two"] | eval ints = to_int(strings);
 warning:Line 1:44: evaluation of [to_int(strings)] failed, treating result as null. Only first 20 failures recorded.
-warning:java.lang.NumberFormatException: For input string: \"one\"
-warning:java.lang.NumberFormatException: For input string: \"two\"
+warning:Line 1:44: java.lang.NumberFormatException: For input string: \"one\"
+warning:Line 1:44: java.lang.NumberFormatException: For input string: \"two\"
 
 strings:keyword |ints:integer
 [one, two]      |null

+ 14 - 10
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/Warnings.java

@@ -10,6 +10,7 @@ package org.elasticsearch.xpack.esql.expression.function;
 import org.elasticsearch.xpack.ql.tree.Source;
 
 import static org.elasticsearch.common.logging.HeaderWarning.addWarning;
+import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
 
 /**
  * Utilities to collect warnings for running an executor.
@@ -17,26 +18,29 @@ import static org.elasticsearch.common.logging.HeaderWarning.addWarning;
 public class Warnings {
     static final int MAX_ADDED_WARNINGS = 20;
 
-    private final Source source;
+    private final String location;
+    private final String first;
 
     private int addedWarnings;
 
     public Warnings(Source source) {
-        this.source = source;
+        location = format("Line {}:{}: ", source.source().getLineNumber(), source.source().getColumnNumber());
+        first = format(
+            null,
+            "{}evaluation of [{}] failed, treating result as null. Only first {} failures recorded.",
+            location,
+            source.text(),
+            MAX_ADDED_WARNINGS
+        );
     }
 
     public void registerException(Exception exception) {
         if (addedWarnings < MAX_ADDED_WARNINGS) {
             if (addedWarnings == 0) {
-                addWarning(
-                    "Line {}:{}: evaluation of [{}] failed, treating result as null. Only first {} failures recorded.",
-                    source.source().getLineNumber(),
-                    source.source().getColumnNumber(),
-                    source.text(),
-                    MAX_ADDED_WARNINGS
-                );
+                addWarning(first);
             }
-            addWarning(exception.getClass().getName() + ": " + exception.getMessage());
+            // location needs to be added to the exception too, since the headers are deduplicated
+            addWarning(location + exception.getClass().getName() + ": " + exception.getMessage());
             addedWarnings++;
         }
     }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/DoubleConstantFunction.java

@@ -16,7 +16,7 @@ import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.DataTypes;
 
 /**
- * Function that emits Euler's number.
+ * Function that emits constants, like Euler's number.
  */
 public abstract class DoubleConstantFunction extends ScalarFunction {
     protected DoubleConstantFunction(Source source) {

+ 155 - 80
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -219,7 +219,7 @@ public final class PlanNamedTypes {
 
     /**
      * List of named type entries that link concrete names to stream reader and writer implementations.
-     * Entries have the form;  category,  name,  serializer method,  deserializer method.
+     * Entries have the form:  category,  name,  serializer method,  deserializer method.
      */
     public static List<PlanNameRegistry.Entry> namedTypeEntries() {
         return List.of(
@@ -388,7 +388,7 @@ public final class PlanNamedTypes {
     // -- physical plan nodes
     static AggregateExec readAggregateExec(PlanStreamInput in) throws IOException {
         return new AggregateExec(
-            Source.EMPTY,
+            in.readSource(),
             in.readPhysicalPlanNode(),
             in.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readExpression)),
             readNamedExpressions(in),
@@ -398,6 +398,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeAggregateExec(PlanStreamOutput out, AggregateExec aggregateExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(aggregateExec.child());
         out.writeCollection(aggregateExec.groupings(), writerFromPlanWriter(PlanStreamOutput::writeExpression));
         writeNamedExpressions(out, aggregateExec.aggregates());
@@ -406,10 +407,11 @@ public final class PlanNamedTypes {
     }
 
     static DissectExec readDissectExec(PlanStreamInput in) throws IOException {
-        return new DissectExec(Source.EMPTY, in.readPhysicalPlanNode(), in.readExpression(), readDissectParser(in), readAttributes(in));
+        return new DissectExec(in.readSource(), in.readPhysicalPlanNode(), in.readExpression(), readDissectParser(in), readAttributes(in));
     }
 
     static void writeDissectExec(PlanStreamOutput out, DissectExec dissectExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(dissectExec.child());
         out.writeExpression(dissectExec.inputExpression());
         writeDissectParser(out, dissectExec.parser());
@@ -418,7 +420,7 @@ public final class PlanNamedTypes {
 
     static EsQueryExec readEsQueryExec(PlanStreamInput in) throws IOException {
         return new EsQueryExec(
-            Source.EMPTY,
+            in.readSource(),
             readEsIndex(in),
             readAttributes(in),
             in.readOptionalNamedWriteable(QueryBuilder.class),
@@ -430,6 +432,7 @@ public final class PlanNamedTypes {
 
     static void writeEsQueryExec(PlanStreamOutput out, EsQueryExec esQueryExec) throws IOException {
         assert esQueryExec.children().size() == 0;
+        out.writeNoSource();
         writeEsIndex(out, esQueryExec.index());
         writeAttributes(out, esQueryExec.output());
         out.writeOptionalNamedWriteable(esQueryExec.query());
@@ -439,27 +442,29 @@ public final class PlanNamedTypes {
     }
 
     static EsSourceExec readEsSourceExec(PlanStreamInput in) throws IOException {
-        return new EsSourceExec(Source.EMPTY, readEsIndex(in), readAttributes(in), in.readOptionalNamedWriteable(QueryBuilder.class));
+        return new EsSourceExec(in.readSource(), readEsIndex(in), readAttributes(in), in.readOptionalNamedWriteable(QueryBuilder.class));
     }
 
     static void writeEsSourceExec(PlanStreamOutput out, EsSourceExec esSourceExec) throws IOException {
+        out.writeNoSource();
         writeEsIndex(out, esSourceExec.index());
         writeAttributes(out, esSourceExec.output());
         out.writeOptionalNamedWriteable(esSourceExec.query());
     }
 
     static EvalExec readEvalExec(PlanStreamInput in) throws IOException {
-        return new EvalExec(Source.EMPTY, in.readPhysicalPlanNode(), readAliases(in));
+        return new EvalExec(in.readSource(), in.readPhysicalPlanNode(), readAliases(in));
     }
 
     static void writeEvalExec(PlanStreamOutput out, EvalExec evalExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(evalExec.child());
         writeAliases(out, evalExec.fields());
     }
 
     static EnrichExec readEnrichExec(PlanStreamInput in) throws IOException {
         return new EnrichExec(
-            Source.EMPTY,
+            in.readSource(),
             in.readPhysicalPlanNode(),
             in.readNamedExpression(),
             in.readString(),
@@ -470,6 +475,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeEnrichExec(PlanStreamOutput out, EnrichExec enrich) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(enrich.child());
         out.writeNamedExpression(enrich.matchField());
         out.writeString(enrich.policyName());
@@ -479,26 +485,28 @@ public final class PlanNamedTypes {
     }
 
     static ExchangeExec readExchangeExec(PlanStreamInput in) throws IOException {
-        return new ExchangeExec(Source.EMPTY, readAttributes(in), in.readBoolean(), in.readPhysicalPlanNode());
+        return new ExchangeExec(in.readSource(), readAttributes(in), in.readBoolean(), in.readPhysicalPlanNode());
     }
 
     static void writeExchangeExec(PlanStreamOutput out, ExchangeExec exchangeExec) throws IOException {
+        out.writeNoSource();
         writeAttributes(out, exchangeExec.output());
         out.writeBoolean(exchangeExec.isInBetweenAggs());
         out.writePhysicalPlanNode(exchangeExec.child());
     }
 
     static ExchangeSinkExec readExchangeSinkExec(PlanStreamInput in) throws IOException {
-        return new ExchangeSinkExec(Source.EMPTY, readAttributes(in), in.readPhysicalPlanNode());
+        return new ExchangeSinkExec(in.readSource(), readAttributes(in), in.readPhysicalPlanNode());
     }
 
     static void writeExchangeSinkExec(PlanStreamOutput out, ExchangeSinkExec exchangeSinkExec) throws IOException {
+        out.writeNoSource();
         writeAttributes(out, exchangeSinkExec.output());
         out.writePhysicalPlanNode(exchangeSinkExec.child());
     }
 
     static ExchangeSourceExec readExchangeSourceExec(PlanStreamInput in) throws IOException {
-        return new ExchangeSourceExec(Source.EMPTY, readAttributes(in), in.readBoolean());
+        return new ExchangeSourceExec(in.readSource(), readAttributes(in), in.readBoolean());
     }
 
     static void writeExchangeSourceExec(PlanStreamOutput out, ExchangeSourceExec exchangeSourceExec) throws IOException {
@@ -507,26 +515,28 @@ public final class PlanNamedTypes {
     }
 
     static FieldExtractExec readFieldExtractExec(PlanStreamInput in) throws IOException {
-        return new FieldExtractExec(Source.EMPTY, in.readPhysicalPlanNode(), readAttributes(in));
+        return new FieldExtractExec(in.readSource(), in.readPhysicalPlanNode(), readAttributes(in));
     }
 
     static void writeFieldExtractExec(PlanStreamOutput out, FieldExtractExec fieldExtractExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(fieldExtractExec.child());
         writeAttributes(out, fieldExtractExec.attributesToExtract());
     }
 
     static FilterExec readFilterExec(PlanStreamInput in) throws IOException {
-        return new FilterExec(Source.EMPTY, in.readPhysicalPlanNode(), in.readExpression());
+        return new FilterExec(in.readSource(), in.readPhysicalPlanNode(), in.readExpression());
     }
 
     static void writeFilterExec(PlanStreamOutput out, FilterExec filterExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(filterExec.child());
         out.writeExpression(filterExec.condition());
     }
 
     static FragmentExec readFragmentExec(PlanStreamInput in) throws IOException {
         return new FragmentExec(
-            Source.EMPTY,
+            in.readSource(),
             in.readLogicalPlanNode(),
             in.readOptionalNamedWriteable(QueryBuilder.class),
             in.readOptionalVInt()
@@ -534,22 +544,25 @@ public final class PlanNamedTypes {
     }
 
     static void writeFragmentExec(PlanStreamOutput out, FragmentExec fragmentExec) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(fragmentExec.fragment());
         out.writeOptionalNamedWriteable(fragmentExec.esFilter());
         out.writeOptionalVInt(fragmentExec.estimatedRowSize());
     }
 
     static GrokExec readGrokExec(PlanStreamInput in) throws IOException {
+        Source source;
         return new GrokExec(
-            Source.EMPTY,
+            source = in.readSource(),
             in.readPhysicalPlanNode(),
             in.readExpression(),
-            Grok.pattern(Source.EMPTY, in.readString()),
+            Grok.pattern(source, in.readString()),
             readAttributes(in)
         );
     }
 
     static void writeGrokExec(PlanStreamOutput out, GrokExec grokExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(grokExec.child());
         out.writeExpression(grokExec.inputExpression());
         out.writeString(grokExec.pattern().pattern());
@@ -557,67 +570,73 @@ public final class PlanNamedTypes {
     }
 
     static LimitExec readLimitExec(PlanStreamInput in) throws IOException {
-        return new LimitExec(Source.EMPTY, in.readPhysicalPlanNode(), in.readNamed(Expression.class));
+        return new LimitExec(in.readSource(), in.readPhysicalPlanNode(), in.readNamed(Expression.class));
     }
 
     static void writeLimitExec(PlanStreamOutput out, LimitExec limitExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(limitExec.child());
         out.writeExpression(limitExec.limit());
     }
 
     static MvExpandExec readMvExpandExec(PlanStreamInput in) throws IOException {
-        return new MvExpandExec(Source.EMPTY, in.readPhysicalPlanNode(), in.readNamedExpression());
+        return new MvExpandExec(in.readSource(), in.readPhysicalPlanNode(), in.readNamedExpression());
     }
 
     static void writeMvExpandExec(PlanStreamOutput out, MvExpandExec mvExpandExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(mvExpandExec.child());
         out.writeNamedExpression(mvExpandExec.target());
     }
 
     static OrderExec readOrderExec(PlanStreamInput in) throws IOException {
         return new OrderExec(
-            Source.EMPTY,
+            in.readSource(),
             in.readPhysicalPlanNode(),
             in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder))
         );
     }
 
     static void writeOrderExec(PlanStreamOutput out, OrderExec orderExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(orderExec.child());
         out.writeCollection(orderExec.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
     }
 
     static ProjectExec readProjectExec(PlanStreamInput in) throws IOException {
-        return new ProjectExec(Source.EMPTY, in.readPhysicalPlanNode(), readNamedExpressions(in));
+        return new ProjectExec(in.readSource(), in.readPhysicalPlanNode(), readNamedExpressions(in));
     }
 
     static void writeProjectExec(PlanStreamOutput out, ProjectExec projectExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(projectExec.child());
         writeNamedExpressions(out, projectExec.projections());
     }
 
     static RowExec readRowExec(PlanStreamInput in) throws IOException {
-        return new RowExec(Source.EMPTY, readAliases(in));
+        return new RowExec(in.readSource(), readAliases(in));
     }
 
     static void writeRowExec(PlanStreamOutput out, RowExec rowExec) throws IOException {
         assert rowExec.children().size() == 0;
+        out.writeNoSource();
         writeAliases(out, rowExec.fields());
     }
 
     @SuppressWarnings("unchecked")
     static ShowExec readShowExec(PlanStreamInput in) throws IOException {
-        return new ShowExec(Source.EMPTY, readAttributes(in), (List<List<Object>>) in.readGenericValue());
+        return new ShowExec(in.readSource(), readAttributes(in), (List<List<Object>>) in.readGenericValue());
     }
 
     static void writeShowExec(PlanStreamOutput out, ShowExec showExec) throws IOException {
+        out.writeNoSource();
         writeAttributes(out, showExec.output());
         out.writeGenericValue(showExec.values());
     }
 
     static TopNExec readTopNExec(PlanStreamInput in) throws IOException {
         return new TopNExec(
-            Source.EMPTY,
+            in.readSource(),
             in.readPhysicalPlanNode(),
             in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder)),
             in.readNamed(Expression.class),
@@ -626,6 +645,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeTopNExec(PlanStreamOutput out, TopNExec topNExec) throws IOException {
+        out.writeNoSource();
         out.writePhysicalPlanNode(topNExec.child());
         out.writeCollection(topNExec.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
         out.writeExpression(topNExec.limit());
@@ -635,7 +655,7 @@ public final class PlanNamedTypes {
     // -- Logical plan nodes
     static Aggregate readAggregate(PlanStreamInput in) throws IOException {
         return new Aggregate(
-            Source.EMPTY,
+            in.readSource(),
             in.readLogicalPlanNode(),
             in.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readExpression)),
             readNamedExpressions(in)
@@ -643,16 +663,18 @@ public final class PlanNamedTypes {
     }
 
     static void writeAggregate(PlanStreamOutput out, Aggregate aggregate) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(aggregate.child());
         out.writeCollection(aggregate.groupings(), writerFromPlanWriter(PlanStreamOutput::writeExpression));
         writeNamedExpressions(out, aggregate.aggregates());
     }
 
     static Dissect readDissect(PlanStreamInput in) throws IOException {
-        return new Dissect(Source.EMPTY, in.readLogicalPlanNode(), in.readExpression(), readDissectParser(in), readAttributes(in));
+        return new Dissect(in.readSource(), in.readLogicalPlanNode(), in.readExpression(), readDissectParser(in), readAttributes(in));
     }
 
     static void writeDissect(PlanStreamOutput out, Dissect dissect) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(dissect.child());
         out.writeExpression(dissect.input());
         writeDissectParser(out, dissect.parser());
@@ -660,27 +682,29 @@ public final class PlanNamedTypes {
     }
 
     static EsRelation readEsRelation(PlanStreamInput in) throws IOException {
-        return new EsRelation(Source.EMPTY, readEsIndex(in), readAttributes(in));
+        return new EsRelation(in.readSource(), readEsIndex(in), readAttributes(in));
     }
 
     static void writeEsRelation(PlanStreamOutput out, EsRelation relation) throws IOException {
         assert relation.children().size() == 0;
+        out.writeNoSource();
         writeEsIndex(out, relation.index());
         writeAttributes(out, relation.output());
     }
 
     static Eval readEval(PlanStreamInput in) throws IOException {
-        return new Eval(Source.EMPTY, in.readLogicalPlanNode(), readAliases(in));
+        return new Eval(in.readSource(), in.readLogicalPlanNode(), readAliases(in));
     }
 
     static void writeEval(PlanStreamOutput out, Eval eval) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(eval.child());
         writeAliases(out, eval.fields());
     }
 
     static Enrich readEnrich(PlanStreamInput in) throws IOException {
         return new Enrich(
-            Source.EMPTY,
+            in.readSource(),
             in.readLogicalPlanNode(),
             in.readExpression(),
             in.readNamedExpression(),
@@ -690,6 +714,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeEnrich(PlanStreamOutput out, Enrich enrich) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(enrich.child());
         out.writeExpression(enrich.policyName());
         out.writeNamedExpression(enrich.matchField());
@@ -700,25 +725,28 @@ public final class PlanNamedTypes {
     }
 
     static Filter readFilter(PlanStreamInput in) throws IOException {
-        return new Filter(Source.EMPTY, in.readLogicalPlanNode(), in.readExpression());
+        return new Filter(in.readSource(), in.readLogicalPlanNode(), in.readExpression());
     }
 
     static void writeFilter(PlanStreamOutput out, Filter filter) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(filter.child());
         out.writeExpression(filter.condition());
     }
 
     static Grok readGrok(PlanStreamInput in) throws IOException {
+        Source source;
         return new Grok(
-            Source.EMPTY,
+            source = in.readSource(),
             in.readLogicalPlanNode(),
             in.readExpression(),
-            Grok.pattern(Source.EMPTY, in.readString()),
+            Grok.pattern(source, in.readString()),
             readAttributes(in)
         );
     }
 
     static void writeGrok(PlanStreamOutput out, Grok grok) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(grok.child());
         out.writeExpression(grok.input());
         out.writeString(grok.parser().pattern());
@@ -726,39 +754,42 @@ public final class PlanNamedTypes {
     }
 
     static Limit readLimit(PlanStreamInput in) throws IOException {
-        return new Limit(Source.EMPTY, in.readNamed(Expression.class), in.readLogicalPlanNode());
+        return new Limit(in.readSource(), in.readNamed(Expression.class), in.readLogicalPlanNode());
     }
 
     static void writeLimit(PlanStreamOutput out, Limit limit) throws IOException {
+        out.writeNoSource();
         out.writeExpression(limit.limit());
         out.writeLogicalPlanNode(limit.child());
     }
 
     static OrderBy readOrderBy(PlanStreamInput in) throws IOException {
         return new OrderBy(
-            Source.EMPTY,
+            in.readSource(),
             in.readLogicalPlanNode(),
             in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder))
         );
     }
 
     static void writeOrderBy(PlanStreamOutput out, OrderBy order) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(order.child());
         out.writeCollection(order.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
     }
 
     static Project readProject(PlanStreamInput in) throws IOException {
-        return new Project(Source.EMPTY, in.readLogicalPlanNode(), readNamedExpressions(in));
+        return new Project(in.readSource(), in.readLogicalPlanNode(), readNamedExpressions(in));
     }
 
     static void writeProject(PlanStreamOutput out, Project project) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(project.child());
         writeNamedExpressions(out, project.projections());
     }
 
     static TopN readTopN(PlanStreamInput in) throws IOException {
         return new TopN(
-            Source.EMPTY,
+            in.readSource(),
             in.readLogicalPlanNode(),
             in.readCollectionAsList(readerFromPlanReader(PlanNamedTypes::readOrder)),
             in.readNamed(Expression.class)
@@ -766,6 +797,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeTopN(PlanStreamOutput out, TopN topN) throws IOException {
+        out.writeNoSource();
         out.writeLogicalPlanNode(topN.child());
         out.writeCollection(topN.order(), writerFromPlanWriter(PlanNamedTypes::writeOrder));
         out.writeExpression(topN.limit());
@@ -801,7 +833,7 @@ public final class PlanNamedTypes {
 
     static FieldAttribute readFieldAttribute(PlanStreamInput in) throws IOException {
         return new FieldAttribute(
-            Source.EMPTY,
+            in.readSource(),
             in.readOptionalWithReader(PlanNamedTypes::readFieldAttribute),
             in.readString(),
             in.dataTypeFromTypeName(in.readString()),
@@ -814,6 +846,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeFieldAttribute(PlanStreamOutput out, FieldAttribute fileAttribute) throws IOException {
+        out.writeNoSource();
         out.writeOptionalWriteable(fileAttribute.parent() == null ? null : o -> writeFieldAttribute(out, fileAttribute.parent()));
         out.writeString(fileAttribute.name());
         out.writeString(fileAttribute.dataType().typeName());
@@ -826,7 +859,7 @@ public final class PlanNamedTypes {
 
     static ReferenceAttribute readReferenceAttr(PlanStreamInput in) throws IOException {
         return new ReferenceAttribute(
-            Source.EMPTY,
+            in.readSource(),
             in.readString(),
             in.dataTypeFromTypeName(in.readString()),
             in.readOptionalString(),
@@ -837,6 +870,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeReferenceAttr(PlanStreamOutput out, ReferenceAttribute referenceAttribute) throws IOException {
+        out.writeNoSource();
         out.writeString(referenceAttribute.name());
         out.writeString(referenceAttribute.dataType().typeName());
         out.writeOptionalString(referenceAttribute.qualifier());
@@ -847,7 +881,7 @@ public final class PlanNamedTypes {
 
     static MetadataAttribute readMetadataAttr(PlanStreamInput in) throws IOException {
         return new MetadataAttribute(
-            Source.EMPTY,
+            in.readSource(),
             in.readString(),
             in.dataTypeFromTypeName(in.readString()),
             in.readOptionalString(),
@@ -859,6 +893,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeMetadataAttr(PlanStreamOutput out, MetadataAttribute metadataAttribute) throws IOException {
+        out.writeNoSource();
         out.writeString(metadataAttribute.name());
         out.writeString(metadataAttribute.dataType().typeName());
         out.writeOptionalString(metadataAttribute.qualifier());
@@ -870,7 +905,7 @@ public final class PlanNamedTypes {
 
     static UnsupportedAttribute readUnsupportedAttr(PlanStreamInput in) throws IOException {
         return new UnsupportedAttribute(
-            Source.EMPTY,
+            in.readSource(),
             in.readString(),
             readUnsupportedEsField(in),
             in.readOptionalString(),
@@ -879,6 +914,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeUnsupportedAttr(PlanStreamOutput out, UnsupportedAttribute unsupportedAttribute) throws IOException {
+        out.writeNoSource();
         out.writeString(unsupportedAttribute.name());
         writeUnsupportedEsField(out, unsupportedAttribute.field());
         out.writeOptionalString(unsupportedAttribute.hasCustomMessage() ? unsupportedAttribute.unresolvedMessage() : null);
@@ -983,22 +1019,24 @@ public final class PlanNamedTypes {
     // -- BinaryComparison
 
     static BinaryComparison readBinComparison(PlanStreamInput in, String name) throws IOException {
+        var source = in.readSource();
         var operation = in.readEnum(BinaryComparisonProcessor.BinaryComparisonOperation.class);
         var left = in.readExpression();
         var right = in.readExpression();
         var zoneId = in.readOptionalZoneId();
         return switch (operation) {
-            case EQ -> new Equals(Source.EMPTY, left, right, zoneId);
-            case NULLEQ -> new NullEquals(Source.EMPTY, left, right, zoneId);
-            case NEQ -> new NotEquals(Source.EMPTY, left, right, zoneId);
-            case GT -> new GreaterThan(Source.EMPTY, left, right, zoneId);
-            case GTE -> new GreaterThanOrEqual(Source.EMPTY, left, right, zoneId);
-            case LT -> new LessThan(Source.EMPTY, left, right, zoneId);
-            case LTE -> new LessThanOrEqual(Source.EMPTY, left, right, zoneId);
+            case EQ -> new Equals(source, left, right, zoneId);
+            case NULLEQ -> new NullEquals(source, left, right, zoneId);
+            case NEQ -> new NotEquals(source, left, right, zoneId);
+            case GT -> new GreaterThan(source, left, right, zoneId);
+            case GTE -> new GreaterThanOrEqual(source, left, right, zoneId);
+            case LT -> new LessThan(source, left, right, zoneId);
+            case LTE -> new LessThanOrEqual(source, left, right, zoneId);
         };
     }
 
     static void writeBinComparison(PlanStreamOutput out, BinaryComparison binaryComparison) throws IOException {
+        out.writeSource(binaryComparison.source());
         out.writeEnum(binaryComparison.function());
         out.writeExpression(binaryComparison.left());
         out.writeExpression(binaryComparison.right());
@@ -1008,10 +1046,11 @@ public final class PlanNamedTypes {
     // -- InComparison
 
     static In readInComparison(PlanStreamInput in) throws IOException {
-        return new In(Source.EMPTY, in.readExpression(), in.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readExpression)));
+        return new In(in.readSource(), in.readExpression(), in.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readExpression)));
     }
 
     static void writeInComparison(PlanStreamOutput out, In in) throws IOException {
+        out.writeSource(in.source());
         out.writeExpression(in.value());
         out.writeCollection(in.list(), writerFromPlanWriter(PlanStreamOutput::writeExpression));
     }
@@ -1019,19 +1058,21 @@ public final class PlanNamedTypes {
     // -- RegexMatch
 
     static WildcardLike readWildcardLike(PlanStreamInput in, String name) throws IOException {
-        return new WildcardLike(Source.EMPTY, in.readExpression(), new WildcardPattern(in.readString()));
+        return new WildcardLike(in.readSource(), in.readExpression(), new WildcardPattern(in.readString()));
     }
 
     static void writeWildcardLike(PlanStreamOutput out, WildcardLike like) throws IOException {
+        out.writeSource(like.source());
         out.writeExpression(like.field());
         out.writeString(like.pattern().pattern());
     }
 
     static RLike readRLike(PlanStreamInput in, String name) throws IOException {
-        return new RLike(Source.EMPTY, in.readExpression(), new RLikePattern(in.readString()));
+        return new RLike(in.readSource(), in.readExpression(), new RLikePattern(in.readString()));
     }
 
     static void writeRLike(PlanStreamOutput out, RLike like) throws IOException {
+        out.writeSource(like.source());
         out.writeExpression(like.field());
         out.writeString(like.pattern().asJavaRegex());
     }
@@ -1044,12 +1085,14 @@ public final class PlanNamedTypes {
     );
 
     static BinaryLogic readBinaryLogic(PlanStreamInput in, String name) throws IOException {
+        var source = in.readSource();
         var left = in.readExpression();
         var right = in.readExpression();
-        return BINARY_LOGIC_CTRS.get(name).apply(Source.EMPTY, left, right);
+        return BINARY_LOGIC_CTRS.get(name).apply(source, left, right);
     }
 
     static void writeBinaryLogic(PlanStreamOutput out, BinaryLogic binaryLogic) throws IOException {
+        out.writeNoSource();
         out.writeExpression(binaryLogic.left());
         out.writeExpression(binaryLogic.right());
     }
@@ -1097,10 +1140,11 @@ public final class PlanNamedTypes {
         if (ctr == null) {
             throw new IOException("Constructor for ESQLUnaryScalar not found for name:" + name);
         }
-        return ctr.apply(Source.EMPTY, in.readExpression());
+        return ctr.apply(in.readSource(), in.readExpression());
     }
 
     static void writeESQLUnaryScalar(PlanStreamOutput out, UnaryScalarFunction function) throws IOException {
+        out.writeSource(function.source());
         out.writeExpression(function.field());
     }
 
@@ -1115,10 +1159,12 @@ public final class PlanNamedTypes {
         if (ctr == null) {
             throw new IOException("Constructor not found:" + name);
         }
-        return ctr.apply(Source.EMPTY);
+        return ctr.apply(in.readSource());
     }
 
-    static void writeNoArgScalar(PlanStreamOutput out, ScalarFunction function) {}
+    static void writeNoArgScalar(PlanStreamOutput out, ScalarFunction function) throws IOException {
+        out.writeNoSource();
+    }
 
     static final Map<
         String,
@@ -1135,30 +1181,33 @@ public final class PlanNamedTypes {
         if (ctr == null) {
             throw new IOException("Constructor for QLUnaryScalar not found for name:" + name);
         }
-        return ctr.apply(Source.EMPTY, in.readExpression());
+        return ctr.apply(in.readSource(), in.readExpression());
     }
 
     static void writeQLUnaryScalar(PlanStreamOutput out, org.elasticsearch.xpack.ql.expression.function.scalar.UnaryScalarFunction function)
         throws IOException {
+        out.writeSource(function.source());
         out.writeExpression(function.field());
     }
 
     // -- ScalarFunction
 
     static Atan2 readAtan2(PlanStreamInput in) throws IOException {
-        return new Atan2(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new Atan2(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeAtan2(PlanStreamOutput out, Atan2 atan2) throws IOException {
+        out.writeSource(atan2.source());
         out.writeExpression(atan2.y());
         out.writeExpression(atan2.x());
     }
 
     static AutoBucket readAutoBucket(PlanStreamInput in) throws IOException {
-        return new AutoBucket(Source.EMPTY, in.readExpression(), in.readExpression(), in.readExpression(), in.readExpression());
+        return new AutoBucket(in.readSource(), in.readExpression(), in.readExpression(), in.readExpression(), in.readExpression());
     }
 
     static void writeAutoBucket(PlanStreamOutput out, AutoBucket bucket) throws IOException {
+        out.writeSource(bucket.source());
         out.writeExpression(bucket.field());
         out.writeExpression(bucket.buckets());
         out.writeExpression(bucket.from());
@@ -1175,10 +1224,11 @@ public final class PlanNamedTypes {
 
     static ScalarFunction readVarag(PlanStreamInput in, String name) throws IOException {
         return VARARG_CTORS.get(name)
-            .apply(Source.EMPTY, in.readExpression(), in.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readExpression)));
+            .apply(in.readSource(), in.readExpression(), in.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readExpression)));
     }
 
     static void writeVararg(PlanStreamOutput out, ScalarFunction vararg) throws IOException {
+        out.writeSource(vararg.source());
         out.writeExpression(vararg.children().get(0));
         out.writeCollection(
             vararg.children().subList(1, vararg.children().size()),
@@ -1187,21 +1237,23 @@ public final class PlanNamedTypes {
     }
 
     static CountDistinct readCountDistinct(PlanStreamInput in) throws IOException {
-        return new CountDistinct(Source.EMPTY, in.readExpression(), in.readOptionalNamed(Expression.class));
+        return new CountDistinct(in.readSource(), in.readExpression(), in.readOptionalNamed(Expression.class));
     }
 
     static void writeCountDistinct(PlanStreamOutput out, CountDistinct countDistinct) throws IOException {
         List<Expression> fields = countDistinct.children();
         assert fields.size() == 1 || fields.size() == 2;
+        out.writeNoSource();
         out.writeExpression(fields.get(0));
         out.writeOptionalWriteable(fields.size() == 2 ? o -> out.writeExpression(fields.get(1)) : null);
     }
 
     static DateExtract readDateExtract(PlanStreamInput in) throws IOException {
-        return new DateExtract(Source.EMPTY, in.readExpression(), in.readExpression(), in.configuration());
+        return new DateExtract(in.readSource(), in.readExpression(), in.readExpression(), in.configuration());
     }
 
     static void writeDateExtract(PlanStreamOutput out, DateExtract function) throws IOException {
+        out.writeSource(function.source());
         List<Expression> fields = function.children();
         assert fields.size() == 2;
         out.writeExpression(fields.get(0));
@@ -1209,10 +1261,11 @@ public final class PlanNamedTypes {
     }
 
     static DateFormat readDateFormat(PlanStreamInput in) throws IOException {
-        return new DateFormat(Source.EMPTY, in.readExpression(), in.readOptionalNamed(Expression.class), in.configuration());
+        return new DateFormat(in.readSource(), in.readExpression(), in.readOptionalNamed(Expression.class), in.configuration());
     }
 
     static void writeDateFormat(PlanStreamOutput out, DateFormat dateFormat) throws IOException {
+        out.writeSource(dateFormat.source());
         List<Expression> fields = dateFormat.children();
         assert fields.size() == 1 || fields.size() == 2;
         out.writeExpression(fields.get(0));
@@ -1220,10 +1273,11 @@ public final class PlanNamedTypes {
     }
 
     static DateParse readDateTimeParse(PlanStreamInput in) throws IOException {
-        return new DateParse(Source.EMPTY, in.readExpression(), in.readOptionalNamed(Expression.class));
+        return new DateParse(in.readSource(), in.readExpression(), in.readOptionalNamed(Expression.class));
     }
 
     static void writeDateTimeParse(PlanStreamOutput out, DateParse function) throws IOException {
+        out.writeSource(function.source());
         List<Expression> fields = function.children();
         assert fields.size() == 1 || fields.size() == 2;
         out.writeExpression(fields.get(0));
@@ -1231,10 +1285,11 @@ public final class PlanNamedTypes {
     }
 
     static DateTrunc readDateTrunc(PlanStreamInput in) throws IOException {
-        return new DateTrunc(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new DateTrunc(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeDateTrunc(PlanStreamOutput out, DateTrunc dateTrunc) throws IOException {
+        out.writeSource(dateTrunc.source());
         List<Expression> fields = dateTrunc.children();
         assert fields.size() == 2;
         out.writeExpression(fields.get(0));
@@ -1242,45 +1297,51 @@ public final class PlanNamedTypes {
     }
 
     static Now readNow(PlanStreamInput in) throws IOException {
-        return new Now(Source.EMPTY, in.configuration());
+        return new Now(in.readSource(), in.configuration());
     }
 
-    static void writeNow(PlanStreamOutput out, Now function) {}
+    static void writeNow(PlanStreamOutput out, Now function) throws IOException {
+        out.writeNoSource();
+    }
 
     static Round readRound(PlanStreamInput in) throws IOException {
-        return new Round(Source.EMPTY, in.readExpression(), in.readOptionalNamed(Expression.class));
+        return new Round(in.readSource(), in.readExpression(), in.readOptionalNamed(Expression.class));
     }
 
     static void writeRound(PlanStreamOutput out, Round round) throws IOException {
+        out.writeSource(round.source());
         out.writeExpression(round.field());
         out.writeOptionalExpression(round.decimals());
     }
 
     static Pow readPow(PlanStreamInput in) throws IOException {
-        return new Pow(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new Pow(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writePow(PlanStreamOutput out, Pow pow) throws IOException {
+        out.writeSource(pow.source());
         out.writeExpression(pow.base());
         out.writeExpression(pow.exponent());
     }
 
     static Percentile readPercentile(PlanStreamInput in) throws IOException {
-        return new Percentile(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new Percentile(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writePercentile(PlanStreamOutput out, Percentile percentile) throws IOException {
         List<Expression> fields = percentile.children();
         assert fields.size() == 2 : "percentile() aggregation must have two arguments";
+        out.writeNoSource();
         out.writeExpression(fields.get(0));
         out.writeExpression(fields.get(1));
     }
 
     static StartsWith readStartsWith(PlanStreamInput in) throws IOException {
-        return new StartsWith(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new StartsWith(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeStartsWith(PlanStreamOutput out, StartsWith startsWith) throws IOException {
+        out.writeSource(startsWith.source());
         List<Expression> fields = startsWith.children();
         assert fields.size() == 2;
         out.writeExpression(fields.get(0));
@@ -1288,21 +1349,23 @@ public final class PlanNamedTypes {
     }
 
     static EndsWith readEndsWith(PlanStreamInput in) throws IOException {
-        return new EndsWith(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new EndsWith(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeEndsWith(PlanStreamOutput out, EndsWith endsWith) throws IOException {
         List<Expression> fields = endsWith.children();
         assert fields.size() == 2;
+        out.writeNoSource();
         out.writeExpression(fields.get(0));
         out.writeExpression(fields.get(1));
     }
 
     static Substring readSubstring(PlanStreamInput in) throws IOException {
-        return new Substring(Source.EMPTY, in.readExpression(), in.readExpression(), in.readOptionalNamed(Expression.class));
+        return new Substring(in.readSource(), in.readExpression(), in.readExpression(), in.readOptionalNamed(Expression.class));
     }
 
     static void writeSubstring(PlanStreamOutput out, Substring substring) throws IOException {
+        out.writeSource(substring.source());
         List<Expression> fields = substring.children();
         assert fields.size() == 2 || fields.size() == 3;
         out.writeExpression(fields.get(0));
@@ -1323,10 +1386,11 @@ public final class PlanNamedTypes {
     }
 
     static Left readLeft(PlanStreamInput in) throws IOException {
-        return new Left(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new Left(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeLeft(PlanStreamOutput out, Left left) throws IOException {
+        out.writeSource(left.source());
         List<Expression> fields = left.children();
         assert fields.size() == 2;
         out.writeExpression(fields.get(0));
@@ -1334,10 +1398,11 @@ public final class PlanNamedTypes {
     }
 
     static Right readRight(PlanStreamInput in) throws IOException {
-        return new Right(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new Right(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeRight(PlanStreamOutput out, Right right) throws IOException {
+        out.writeSource(right.source());
         List<Expression> fields = right.children();
         assert fields.size() == 2;
         out.writeExpression(fields.get(0));
@@ -1345,23 +1410,25 @@ public final class PlanNamedTypes {
     }
 
     static Split readSplit(PlanStreamInput in) throws IOException {
-        return new Split(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new Split(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeSplit(PlanStreamOutput out, Split split) throws IOException {
+        out.writeSource(split.source());
         out.writeExpression(split.left());
         out.writeExpression(split.right());
     }
 
     static CIDRMatch readCIDRMatch(PlanStreamInput in) throws IOException {
         return new CIDRMatch(
-            Source.EMPTY,
+            in.readSource(),
             in.readExpression(),
             in.readCollectionAsList(readerFromPlanReader(PlanStreamInput::readExpression))
         );
     }
 
     static void writeCIDRMatch(PlanStreamOutput out, CIDRMatch cidrMatch) throws IOException {
+        out.writeSource(cidrMatch.source());
         List<Expression> children = cidrMatch.children();
         assert children.size() > 1;
         out.writeExpression(children.get(0));
@@ -1379,12 +1446,14 @@ public final class PlanNamedTypes {
     );
 
     static ArithmeticOperation readArithmeticOperation(PlanStreamInput in, String name) throws IOException {
+        var source = in.readSource();
         var left = in.readExpression();
         var right = in.readExpression();
-        return ARITHMETIC_CTRS.get(name).apply(Source.EMPTY, left, right);
+        return ARITHMETIC_CTRS.get(name).apply(source, left, right);
     }
 
     static void writeArithmeticOperation(PlanStreamOutput out, ArithmeticOperation arithmeticOperation) throws IOException {
+        out.writeSource(arithmeticOperation.source());
         out.writeExpression(arithmeticOperation.left());
         out.writeExpression(arithmeticOperation.right());
     }
@@ -1401,10 +1470,11 @@ public final class PlanNamedTypes {
     );
 
     static AggregateFunction readAggFunction(PlanStreamInput in, String name) throws IOException {
-        return AGG_CTRS.get(name).apply(Source.EMPTY, in.readExpression());
+        return AGG_CTRS.get(name).apply(in.readSource(), in.readExpression());
     }
 
     static void writeAggFunction(PlanStreamOutput out, AggregateFunction aggregateFunction) throws IOException {
+        out.writeNoSource();
         out.writeExpression(aggregateFunction.field());
     }
 
@@ -1420,18 +1490,20 @@ public final class PlanNamedTypes {
     );
 
     static AbstractMultivalueFunction readMvFunction(PlanStreamInput in, String name) throws IOException {
-        return MV_CTRS.get(name).apply(Source.EMPTY, in.readExpression());
+        return MV_CTRS.get(name).apply(in.readSource(), in.readExpression());
     }
 
     static void writeMvFunction(PlanStreamOutput out, AbstractMultivalueFunction fn) throws IOException {
+        out.writeNoSource();
         out.writeExpression(fn.field());
     }
 
     static MvConcat readMvConcat(PlanStreamInput in) throws IOException {
-        return new MvConcat(Source.EMPTY, in.readExpression(), in.readExpression());
+        return new MvConcat(in.readSource(), in.readExpression(), in.readExpression());
     }
 
     static void writeMvConcat(PlanStreamOutput out, MvConcat fn) throws IOException {
+        out.writeNoSource();
         out.writeExpression(fn.left());
         out.writeExpression(fn.right());
     }
@@ -1440,7 +1512,7 @@ public final class PlanNamedTypes {
 
     static Alias readAlias(PlanStreamInput in) throws IOException {
         return new Alias(
-            Source.EMPTY,
+            in.readSource(),
             in.readString(),
             in.readOptionalString(),
             in.readNamed(Expression.class),
@@ -1450,6 +1522,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeAlias(PlanStreamOutput out, Alias alias) throws IOException {
+        out.writeNoSource();
         out.writeString(alias.name());
         out.writeOptionalString(alias.qualifier());
         out.writeExpression(alias.child());
@@ -1460,17 +1533,18 @@ public final class PlanNamedTypes {
     // -- Expressions (other)
 
     static Literal readLiteral(PlanStreamInput in) throws IOException {
-        return new Literal(Source.EMPTY, in.readGenericValue(), in.dataTypeFromTypeName(in.readString()));
+        return new Literal(in.readSource(), in.readGenericValue(), in.dataTypeFromTypeName(in.readString()));
     }
 
     static void writeLiteral(PlanStreamOutput out, Literal literal) throws IOException {
+        out.writeNoSource();
         out.writeGenericValue(literal.value());
         out.writeString(literal.dataType().typeName());
     }
 
     static Order readOrder(PlanStreamInput in) throws IOException {
         return new org.elasticsearch.xpack.esql.expression.Order(
-            Source.EMPTY,
+            in.readSource(),
             in.readNamed(Expression.class),
             in.readEnum(Order.OrderDirection.class),
             in.readEnum(Order.NullsPosition.class)
@@ -1478,6 +1552,7 @@ public final class PlanNamedTypes {
     }
 
     static void writeOrder(PlanStreamOutput out, Order order) throws IOException {
+        out.writeNoSource();
         out.writeExpression(order.child());
         out.writeEnum(order.direction());
         out.writeEnum(order.nullsPosition());

+ 47 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java

@@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanNamedReader;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
@@ -23,8 +24,11 @@ import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.NameId;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.ql.tree.Location;
+import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.EsField;
+import org.elasticsearch.xpack.ql.util.StringUtils;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -62,7 +66,7 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
     // hook for nameId, where can cache and map, for now just return a NameId of the same long value.
     private final LongFunction<NameId> nameIdFunction;
 
-    private EsqlConfiguration configuration;
+    private final EsqlConfiguration configuration;
 
     public PlanStreamInput(
         StreamInput streamInput,
@@ -101,6 +105,48 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
         return readNamed(PhysicalPlan.class);
     }
 
+    public Source readSource() throws IOException {
+        boolean hasSource = readBoolean();
+        if (hasSource) {
+            int line = readInt();
+            int column = readInt();
+            int length = readInt();
+            int charPositionInLine = column - 1;
+            return new Source(new Location(line, charPositionInLine), sourceText(configuration.query(), line, column, length));
+        }
+        return Source.EMPTY;
+    }
+
+    private static String sourceText(String query, int line, int column, int length) {
+        if (line <= 0 || column <= 0 || query.isEmpty()) {
+            return StringUtils.EMPTY;
+        }
+        int offset = textOffset(query, line, column);
+        if (offset + length > query.length()) {
+            throw new EsqlIllegalArgumentException(
+                "location [@" + line + ":" + column + "] and length [" + length + "] overrun query size [" + query.length() + "]"
+            );
+        }
+        return query.substring(offset, offset + length);
+    }
+
+    private static int textOffset(String query, int line, int column) {
+        int offset = 0;
+        if (line > 1) {
+            String[] lines = query.split("\n");
+            if (line > lines.length) {
+                throw new EsqlIllegalArgumentException(
+                    "line location [" + line + "] higher than max [" + lines.length + "] in query [" + query + "]"
+                );
+            }
+            for (int i = 0; i < line - 1; i++) {
+                offset += lines[i].length() + 1; // +1 accounts for the removed \n
+            }
+        }
+        offset += column - 1; // -1 since column is 1-based indexed
+        return offset;
+    }
+
     public Expression readExpression() throws IOException {
         return readNamed(Expression.class);
     }

+ 12 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

@@ -15,6 +15,7 @@ import org.elasticsearch.xpack.ql.expression.Attribute;
 import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.ql.tree.Source;
 
 import java.io.IOException;
 import java.util.function.Function;
@@ -49,6 +50,17 @@ public final class PlanStreamOutput extends OutputStreamStreamOutput {
         writeNamed(PhysicalPlan.class, physicalPlan);
     }
 
+    public void writeSource(Source source) throws IOException {
+        writeBoolean(true);
+        writeInt(source.source().getLineNumber());
+        writeInt(source.source().getColumnNumber());
+        writeInt(source.text().length());
+    }
+
+    public void writeNoSource() throws IOException {
+        writeBoolean(false);
+    }
+
     public void writeExpression(Expression expression) throws IOException {
         writeNamed(Expression.class, expression);
     }

+ 2 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -107,7 +107,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             clusterService.getClusterName().value(),
             request.pragmas(),
             EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.get(settings),
-            EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.get(settings)
+            EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.get(settings),
+            request.query()
         );
         String sessionId = sessionID(task);
         planExecutor.esql(

+ 45 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.xpack.esql.session;
 
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.compress.CompressorFactory;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -14,12 +16,18 @@ import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.ql.session.Configuration;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Locale;
 import java.util.Objects;
 
+import static org.elasticsearch.common.unit.ByteSizeUnit.KB;
+
 public class EsqlConfiguration extends Configuration implements Writeable {
+
+    static final int QUERY_COMPRESS_THRESHOLD_CHARS = KB.toIntBytes(5);
+
     private final QueryPragmas pragmas;
 
     private final int resultTruncationMaxSize;
@@ -27,6 +35,8 @@ public class EsqlConfiguration extends Configuration implements Writeable {
 
     private final Locale locale;
 
+    private final String query;
+
     public EsqlConfiguration(
         ZoneId zi,
         Locale locale,
@@ -34,13 +44,15 @@ public class EsqlConfiguration extends Configuration implements Writeable {
         String clusterName,
         QueryPragmas pragmas,
         int resultTruncationMaxSize,
-        int resultTruncationDefaultSize
+        int resultTruncationDefaultSize,
+        String query
     ) {
         super(zi, username, clusterName);
         this.locale = locale;
         this.pragmas = pragmas;
         this.resultTruncationMaxSize = resultTruncationMaxSize;
         this.resultTruncationDefaultSize = resultTruncationDefaultSize;
+        this.query = query;
     }
 
     public EsqlConfiguration(StreamInput in) throws IOException {
@@ -49,6 +61,7 @@ public class EsqlConfiguration extends Configuration implements Writeable {
         this.pragmas = new QueryPragmas(in);
         this.resultTruncationMaxSize = in.readVInt();
         this.resultTruncationDefaultSize = in.readVInt();
+        this.query = readQuery(in);
     }
 
     @Override
@@ -63,6 +76,7 @@ public class EsqlConfiguration extends Configuration implements Writeable {
         pragmas.writeTo(out);
         out.writeVInt(resultTruncationMaxSize);
         out.writeVInt(resultTruncationDefaultSize);
+        writeQuery(out, query);
     }
 
     public QueryPragmas pragmas() {
@@ -81,6 +95,33 @@ public class EsqlConfiguration extends Configuration implements Writeable {
         return locale;
     }
 
+    public String query() {
+        return query;
+    }
+
+    private static void writeQuery(StreamOutput out, String query) throws IOException {
+        if (query.length() > QUERY_COMPRESS_THRESHOLD_CHARS) { // compare on chars to avoid UTF-8 encoding unless actually required
+            out.writeBoolean(true);
+            var bytesArray = new BytesArray(query.getBytes(StandardCharsets.UTF_8));
+            var bytesRef = CompressorFactory.COMPRESSOR.compress(bytesArray);
+            out.writeByteArray(bytesRef.array());
+        } else {
+            out.writeBoolean(false);
+            out.writeString(query);
+        }
+    }
+
+    private static String readQuery(StreamInput in) throws IOException {
+        boolean compressed = in.readBoolean();
+        if (compressed) {
+            byte[] bytes = in.readByteArray();
+            var bytesRef = CompressorFactory.uncompress(new BytesArray(bytes));
+            return new String(bytesRef.array(), StandardCharsets.UTF_8);
+        } else {
+            return in.readString();
+        }
+    }
+
     @Override
     public boolean equals(Object o) {
         if (super.equals(o)) {
@@ -88,13 +129,14 @@ public class EsqlConfiguration extends Configuration implements Writeable {
             return resultTruncationMaxSize == that.resultTruncationMaxSize
                 && resultTruncationDefaultSize == that.resultTruncationDefaultSize
                 && Objects.equals(pragmas, that.pragmas)
-                && Objects.equals(locale, that.locale);
+                && Objects.equals(locale, that.locale)
+                && Objects.equals(that.query, query);
         }
         return false;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), pragmas, resultTruncationMaxSize, resultTruncationDefaultSize, locale);
+        return Objects.hash(super.hashCode(), pragmas, resultTruncationMaxSize, resultTruncationDefaultSize, locale, query);
     }
 }

+ 2 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

@@ -45,7 +45,6 @@ import org.elasticsearch.xpack.esql.CsvTestUtils.Type;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
-import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolution;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
@@ -70,7 +69,6 @@ import org.elasticsearch.xpack.esql.planner.TestPhysicalOperationProviders;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.esql.stats.DisabledSearchStats;
-import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.CsvSpecReader;
 import org.elasticsearch.xpack.ql.SpecReader;
@@ -102,6 +100,7 @@ import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
 import static org.elasticsearch.xpack.esql.CsvTestUtils.loadCsvSpecValues;
 import static org.elasticsearch.xpack.esql.CsvTestUtils.loadPageFromCsv;
 import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.CSV_DATASET_MAP;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.ESQL_THREAD_POOL_NAME;
 import static org.elasticsearch.xpack.ql.CsvSpecReader.specParser;
@@ -281,10 +280,7 @@ public class CsvTests extends ESTestCase {
     private PhysicalPlan physicalPlan(LogicalPlan parsed, CsvTestsDataLoader.TestsDataset dataset) {
         var indexResolution = loadIndexResolution(dataset.mappingFileName(), dataset.indexName());
         var enrichPolicies = loadEnrichPolicies();
-        var analyzer = new Analyzer(
-            new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies),
-            new Verifier(new Metrics())
-        );
+        var analyzer = new Analyzer(new AnalyzerContext(configuration, functionRegistry, indexResolution, enrichPolicies), TEST_VERIFIER);
         var analyzed = analyzer.analyze(parsed);
         var logicalOptimized = logicalPlanOptimizer.optimize(analyzed);
         var physicalPlan = mapper.map(logicalOptimized);

+ 6 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java

@@ -27,6 +27,7 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
 import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
+import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.ql.expression.Expression;
 
 import java.io.IOException;
@@ -48,6 +49,10 @@ public class SerializationTestUtils {
     }
 
     public static <T> T serializeDeserialize(T orig, Serializer<T> serializer, Deserializer<T> deserializer) {
+        return serializeDeserialize(orig, serializer, deserializer, EsqlTestUtils.TEST_CFG);
+    }
+
+    public static <T> T serializeDeserialize(T orig, Serializer<T> serializer, Deserializer<T> deserializer, EsqlConfiguration config) {
         try (BytesStreamOutput out = new BytesStreamOutput()) {
             PlanStreamOutput planStreamOutput = new PlanStreamOutput(out, planNameRegistry);
             serializer.write(planStreamOutput, orig);
@@ -55,7 +60,7 @@ public class SerializationTestUtils {
                 ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())),
                 writableRegistry()
             );
-            PlanStreamInput planStreamInput = new PlanStreamInput(in, planNameRegistry, writableRegistry(), EsqlTestUtils.TEST_CFG);
+            PlanStreamInput planStreamInput = new PlanStreamInput(in, planNameRegistry, writableRegistry(), config);
             return deserializer.read(planStreamInput);
         } catch (IOException e) {
             throw new UncheckedIOException(e);

+ 10 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTestUtils.java

@@ -12,7 +12,7 @@ import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolution;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.parser.EsqlParser;
-import org.elasticsearch.xpack.esql.stats.Metrics;
+import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
@@ -21,6 +21,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
+
 public final class AnalyzerTestUtils {
 
     private AnalyzerTestUtils() {}
@@ -34,7 +37,7 @@ public final class AnalyzerTestUtils {
     }
 
     public static Analyzer analyzer(IndexResolution indexResolution) {
-        return analyzer(indexResolution, new Verifier(new Metrics()));
+        return analyzer(indexResolution, TEST_VERIFIER);
     }
 
     public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier) {
@@ -44,6 +47,10 @@ public final class AnalyzerTestUtils {
         );
     }
 
+    public static Analyzer analyzer(IndexResolution indexResolution, Verifier verifier, EsqlConfiguration config) {
+        return new Analyzer(new AnalyzerContext(config, new EsqlFunctionRegistry(), indexResolution, defaultEnrichResolution()), verifier);
+    }
+
     public static Analyzer analyzer(Verifier verifier) {
         return new Analyzer(
             new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), analyzerDefaultMapping(), defaultEnrichResolution()),
@@ -56,7 +63,7 @@ public final class AnalyzerTestUtils {
     }
 
     public static LogicalPlan analyze(String query, String mapping) {
-        return analyze(query, analyzer(loadMapping(mapping, "test")));
+        return analyze(query, analyzer(loadMapping(mapping, "test"), TEST_VERIFIER, configuration(query)));
     }
 
     public static LogicalPlan analyze(String query, Analyzer analyzer) {

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/ParsingTests.java

@@ -10,13 +10,13 @@ package org.elasticsearch.xpack.esql.analysis;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.parser.EsqlParser;
-import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.ql.ParsingException;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
 import org.elasticsearch.xpack.ql.type.TypesTests;
 
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyPolicyResolution;
 
 public class ParsingTests extends ESTestCase {
@@ -26,7 +26,7 @@ public class ParsingTests extends ESTestCase {
     private final IndexResolution defaultIndex = loadIndexResolution("mapping-basic.json");
     private final Analyzer defaultAnalyzer = new Analyzer(
         new AnalyzerContext(TEST_CFG, new EsqlFunctionRegistry(), defaultIndex, emptyPolicyResolution()),
-        new Verifier(new Metrics())
+        TEST_VERIFIER
     );
 
     public void testCaseFunctionInvalidInputs() {

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/WarningsTests.java

@@ -16,7 +16,7 @@ public class WarningsTests extends ESTestCase {
         warnings.registerException(new IllegalArgumentException());
         assertCriticalWarnings(
             "Line 1:2: evaluation of [foo] failed, treating result as null. Only first 20 failures recorded.",
-            "java.lang.IllegalArgumentException: null"
+            "Line 1:2: java.lang.IllegalArgumentException: null"
         );
     }
 
@@ -29,7 +29,7 @@ public class WarningsTests extends ESTestCase {
         String[] expected = new String[21];
         expected[0] = "Line 1:2: evaluation of [foo] failed, treating result as null. Only first 20 failures recorded.";
         for (int i = 0; i < Warnings.MAX_ADDED_WARNINGS; i++) {
-            expected[i + 1] = "java.lang.IllegalArgumentException: " + i;
+            expected[i + 1] = "Line 1:2: java.lang.IllegalArgumentException: " + i;
         }
 
         assertCriticalWarnings(expected);

+ 3 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/ToIPTests.java

@@ -53,7 +53,9 @@ public class ToIPTests extends AbstractFunctionTestCase {
                     DataTypes.IP,
                     equalTo(null)
                 ).withWarning("Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.")
-                    .withWarning("java.lang.IllegalArgumentException: '" + value.utf8ToString() + "' is not an IP string literal.");
+                    .withWarning(
+                        "Line -1:-1: java.lang.IllegalArgumentException: '" + value.utf8ToString() + "' is not an IP string literal."
+                    );
                 return testCase;
             }));
         }

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AcosTests.java

@@ -39,7 +39,7 @@ public class AcosTests extends AbstractFunctionTestCase {
                 Math.nextDown(-1d),
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: Acos input out of range"
+                    "Line -1:-1: java.lang.ArithmeticException: Acos input out of range"
                 )
             )
         );
@@ -52,7 +52,7 @@ public class AcosTests extends AbstractFunctionTestCase {
                 Double.POSITIVE_INFINITY,
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: Acos input out of range"
+                    "Line -1:-1: java.lang.ArithmeticException: Acos input out of range"
                 )
             )
         );

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/AsinTests.java

@@ -39,7 +39,7 @@ public class AsinTests extends AbstractFunctionTestCase {
                 Math.nextDown(-1d),
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: Asin input out of range"
+                    "Line -1:-1: java.lang.ArithmeticException: Asin input out of range"
                 )
             )
         );
@@ -52,7 +52,7 @@ public class AsinTests extends AbstractFunctionTestCase {
                 Double.POSITIVE_INFINITY,
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: Asin input out of range"
+                    "Line -1:-1: java.lang.ArithmeticException: Asin input out of range"
                 )
             )
         );

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/CoshTests.java

@@ -45,7 +45,7 @@ public class CoshTests extends AbstractFunctionTestCase {
                 -711d,
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: cosh overflow"
+                    "Line -1:-1: java.lang.ArithmeticException: cosh overflow"
                 )
             )
         );
@@ -58,7 +58,7 @@ public class CoshTests extends AbstractFunctionTestCase {
                 Double.POSITIVE_INFINITY,
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: cosh overflow"
+                    "Line -1:-1: java.lang.ArithmeticException: cosh overflow"
                 )
             )
         );

+ 4 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/Log10Tests.java

@@ -82,7 +82,7 @@ public class Log10Tests extends AbstractFunctionTestCase {
             0,
             List.of(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: Log of non-positive number"
+                "Line -1:-1: java.lang.ArithmeticException: Log of non-positive number"
             )
         );
         TestCaseSupplier.forUnaryLong(
@@ -94,7 +94,7 @@ public class Log10Tests extends AbstractFunctionTestCase {
             0L,
             List.of(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: Log of non-positive number"
+                "Line -1:-1: java.lang.ArithmeticException: Log of non-positive number"
             )
         );
         TestCaseSupplier.forUnaryUnsignedLong(
@@ -106,7 +106,7 @@ public class Log10Tests extends AbstractFunctionTestCase {
             BigInteger.ZERO,
             List.of(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: Log of non-positive number"
+                "Line -1:-1: java.lang.ArithmeticException: Log of non-positive number"
             )
         );
         TestCaseSupplier.forUnaryDouble(
@@ -118,7 +118,7 @@ public class Log10Tests extends AbstractFunctionTestCase {
             0d,
             List.of(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: Log of non-positive number"
+                "Line -1:-1: java.lang.ArithmeticException: Log of non-positive number"
             )
         );
 

+ 5 - 5
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/PowTests.java

@@ -54,7 +54,7 @@ public class PowTests extends AbstractScalarFunctionTestCase {
                     DataTypes.DOUBLE,
                     equalTo(null)
                 ).withWarning("Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.")
-                    .withWarning("java.lang.ArithmeticException: invalid result: pow(NaN, 1.0)")
+                    .withWarning("Line -1:-1: java.lang.ArithmeticException: invalid result: pow(NaN, 1.0)")
             ),
             new TestCaseSupplier(
                 "pow(1, NaN)",
@@ -67,7 +67,7 @@ public class PowTests extends AbstractScalarFunctionTestCase {
                     DataTypes.DOUBLE,
                     equalTo(null)
                 ).withWarning("Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.")
-                    .withWarning("java.lang.ArithmeticException: invalid result: pow(1.0, NaN)")
+                    .withWarning("Line -1:-1: java.lang.ArithmeticException: invalid result: pow(1.0, NaN)")
             ),
             new TestCaseSupplier(
                 "pow(NaN, 0)",
@@ -144,7 +144,7 @@ public class PowTests extends AbstractScalarFunctionTestCase {
                     DataTypes.INTEGER,
                     equalTo(null)
                 ).withWarning("Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.")
-                    .withWarning("java.lang.ArithmeticException: integer overflow")
+                    .withWarning("Line -1:-1: java.lang.ArithmeticException: integer overflow")
             ),
             new TestCaseSupplier(
                 "long overflow case",
@@ -158,7 +158,7 @@ public class PowTests extends AbstractScalarFunctionTestCase {
                     DataTypes.LONG,
                     equalTo(null)
                 ).withWarning("Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.")
-                    .withWarning("java.lang.ArithmeticException: long overflow")
+                    .withWarning("Line -1:-1: java.lang.ArithmeticException: long overflow")
             ),
             new TestCaseSupplier(
                 "pow(2, 0.5) == sqrt(2)",
@@ -213,7 +213,7 @@ public class PowTests extends AbstractScalarFunctionTestCase {
                     DataTypes.DOUBLE,
                     equalTo(null)
                 ).withWarning("Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.")
-                    .withWarning("java.lang.ArithmeticException: invalid result: pow(" + (double) base + ", " + exp + ")");
+                    .withWarning("Line -1:-1: java.lang.ArithmeticException: invalid result: pow(" + (double) base + ", " + exp + ")");
                 return testCase;
             }),
             new TestCaseSupplier(

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/SinhTests.java

@@ -45,7 +45,7 @@ public class SinhTests extends AbstractFunctionTestCase {
                 -711d,
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: sinh overflow"
+                    "Line -1:-1: java.lang.ArithmeticException: sinh overflow"
                 )
             )
         );
@@ -58,7 +58,7 @@ public class SinhTests extends AbstractFunctionTestCase {
                 Double.POSITIVE_INFINITY,
                 List.of(
                     "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                    "java.lang.ArithmeticException: sinh overflow"
+                    "Line -1:-1: java.lang.ArithmeticException: sinh overflow"
                 )
             )
         );

+ 3 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/math/SqrtTests.java

@@ -80,7 +80,7 @@ public class SqrtTests extends AbstractFunctionTestCase {
             -1,
             List.of(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: Square root of negative"
+                "Line -1:-1: java.lang.ArithmeticException: Square root of negative"
             )
         );
         TestCaseSupplier.forUnaryLong(
@@ -92,7 +92,7 @@ public class SqrtTests extends AbstractFunctionTestCase {
             -1,
             List.of(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: Square root of negative"
+                "Line -1:-1: java.lang.ArithmeticException: Square root of negative"
             )
         );
         TestCaseSupplier.forUnaryDouble(
@@ -104,7 +104,7 @@ public class SqrtTests extends AbstractFunctionTestCase {
             -Double.MIN_VALUE,
             List.of(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: Square root of negative"
+                "Line -1:-1: java.lang.ArithmeticException: Square root of negative"
             )
         );
         return parameterSuppliersFromTypedData(errorsForCasesWithoutExamples(suppliers));

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/AbstractBinaryOperatorTestCase.java

@@ -104,7 +104,7 @@ public abstract class AbstractBinaryOperatorTestCase extends AbstractFunctionTes
                 if (result == null) {
                     assertCriticalWarnings(
                         "Line -1:-1: evaluation of [" + op + "] failed, treating result as null. Only first 20 failures recorded.",
-                        "java.lang.ArithmeticException: " + commonType(lhsType, rhsType).typeName() + " overflow"
+                        "Line -1:-1: java.lang.ArithmeticException: " + commonType(lhsType, rhsType).typeName() + " overflow"
                     );
                 } else {
                     // The type's currently only used for distinguishing between LONG and UNSIGNED_LONG. UL requires both operands be of

+ 2 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/predicate/operator/arithmetic/NegTests.java

@@ -110,7 +110,7 @@ public class NegTests extends AbstractScalarFunctionTestCase {
             assertEquals(null, process(Integer.MIN_VALUE));
             assertCriticalWarnings(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: integer overflow"
+                "Line -1:-1: java.lang.ArithmeticException: integer overflow"
             );
 
             return;
@@ -119,7 +119,7 @@ public class NegTests extends AbstractScalarFunctionTestCase {
             assertEquals(null, process(Long.MIN_VALUE));
             assertCriticalWarnings(
                 "Line -1:-1: evaluation of [] failed, treating result as null. Only first 20 failures recorded.",
-                "java.lang.ArithmeticException: long overflow"
+                "Line -1:-1: java.lang.ArithmeticException: long overflow"
             );
 
             return;

+ 59 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInputTests.java

@@ -8,7 +8,13 @@
 package org.elasticsearch.xpack.esql.io.stream;
 
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.esql.plan.logical.Eval;
+import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
+import org.elasticsearch.xpack.ql.expression.Expression;
 import org.elasticsearch.xpack.ql.expression.NameId;
+import org.elasticsearch.xpack.ql.plan.logical.Filter;
+import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.ql.tree.Source;
 
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -16,7 +22,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
+import static org.elasticsearch.xpack.esql.SerializationTestUtils.serializeDeserialize;
+import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.analyze;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
@@ -86,4 +96,53 @@ public class PlanStreamInputTests extends ESTestCase {
         }
         return longs.stream().toList();
     }
+
+    public void testSourceSerialization() {
+        Function<String, String> queryFn = delimiter -> delimiter
+            + "FROM "
+            + delimiter
+            + " test "
+            + delimiter
+            + "| EVAL "
+            + delimiter
+            + " x = CONCAT(first_name, \"baz\")"
+            + delimiter
+            + "| EVAL last_name IN (\"foo\", "
+            + delimiter
+            + " \"bar\")"
+            + delimiter
+            + "| "
+            + delimiter
+            + "WHERE emp_no == abs("
+            + delimiter
+            + "emp_no)"
+            + delimiter;
+
+        Function<LogicalPlan, List<Source>> sources = plan -> {
+            List<Expression> exp = new ArrayList<>();
+            plan.forEachDown(p -> {
+                if (p instanceof Eval e) {
+                    e.fields().forEach(a -> exp.add(a.child()));
+                } else if (p instanceof Filter f) {
+                    exp.add(f.condition());
+                }
+            });
+            return exp.stream().map(Expression::source).toList();
+        };
+
+        for (var delim : new String[] { "", "\r", "\n", "\r\n" }) {
+            String query = queryFn.apply(delim);
+            EsqlConfiguration config = configuration(query);
+
+            LogicalPlan planIn = analyze(query);
+            LogicalPlan planOut = serializeDeserialize(
+                planIn,
+                PlanStreamOutput::writeLogicalPlanNode,
+                PlanStreamInput::readLogicalPlanNode,
+                config
+            );
+            assertThat(planIn, equalTo(planOut));
+            assertThat(sources.apply(planIn), equalTo(sources.apply(planOut)));
+        }
+    }
 }

+ 2 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java

@@ -11,13 +11,11 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
-import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.parser.EsqlParser;
 import org.elasticsearch.xpack.esql.plan.logical.Eval;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
-import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.elasticsearch.xpack.ql.expression.Alias;
 import org.elasticsearch.xpack.ql.expression.Expressions;
@@ -36,6 +34,7 @@ import org.junit.BeforeClass;
 import java.util.Map;
 
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForMissingField;
@@ -64,7 +63,7 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
 
         analyzer = new Analyzer(
             new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, EsqlTestUtils.emptyPolicyResolution()),
-            new Verifier(new Metrics())
+            TEST_VERIFIER
         );
     }
 

+ 2 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

@@ -15,7 +15,6 @@ import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
-import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolution;
 import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.Equals;
 import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.GreaterThan;
@@ -46,7 +45,6 @@ import org.elasticsearch.xpack.esql.plan.logical.TopN;
 import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
 import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
-import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.ql.expression.Alias;
 import org.elasticsearch.xpack.ql.expression.Attribute;
 import org.elasticsearch.xpack.ql.expression.Expression;
@@ -84,6 +82,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptySource;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
@@ -135,7 +134,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
                 getIndexResult,
                 new EnrichResolution(Set.of(policy), Set.of("languages_idx", "something"))
             ),
-            new Verifier(new Metrics())
+            TEST_VERIFIER
         );
     }
 

+ 2 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

@@ -27,7 +27,6 @@ import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
-import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolution;
 import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.Equals;
 import org.elasticsearch.xpack.esql.evaluator.predicate.operator.comparison.GreaterThan;
@@ -64,7 +63,6 @@ import org.elasticsearch.xpack.esql.planner.PlannerUtils;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
-import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.esql.stats.SearchStats;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.expression.Attribute;
@@ -90,6 +88,7 @@ import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.core.Tuple.tuple;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
@@ -185,10 +184,7 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
             Set.of("foo")
         );
 
-        analyzer = new Analyzer(
-            new AnalyzerContext(config, functionRegistry, getIndexResult, enrichResolution),
-            new Verifier(new Metrics())
-        );
+        analyzer = new Analyzer(new AnalyzerContext(config, functionRegistry, getIndexResult, enrichResolution), TEST_VERIFIER);
     }
 
     public void testSingleFieldExtractor() {

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/ExpressionTests.java

@@ -84,7 +84,7 @@ public class ExpressionTests extends ESTestCase {
         assertEquals(l(-123, INTEGER), whereExpression("+(-123)"));
         assertEquals(l(-123, INTEGER), whereExpression("+(+(-123))"));
         // we could do better here. ES SQL is smarter and accounts for the number of minuses
-        assertEquals(new Neg(null, l(-123, INTEGER)), whereExpression("-(-123)"));
+        assertEquals(new Neg(EMPTY, l(-123, INTEGER)), whereExpression("-(-123)"));
     }
 
     public void testStringLiterals() {

+ 11 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java

@@ -48,6 +48,7 @@ import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
 
 import static org.elasticsearch.xpack.ql.expression.Literal.FALSE;
 import static org.elasticsearch.xpack.ql.expression.Literal.TRUE;
@@ -500,6 +501,16 @@ public class StatementParserTests extends ESTestCase {
         } while (wsIndex >= 0);
     }
 
+    public void testNewLines() {
+        String[] delims = new String[] { "", "\r", "\n", "\r\n" };
+        Function<String, String> queryFun = d -> d + "from " + d + " foo " + d + "| eval " + d + " x = concat(bar, \"baz\")" + d;
+        LogicalPlan reference = statement(queryFun.apply(delims[0]));
+        for (int i = 1; i < delims.length; i++) {
+            LogicalPlan candidate = statement(queryFun.apply(delims[i]));
+            assertThat(candidate, equalTo(reference));
+        }
+    }
+
     public void testSuggestAvailableSourceCommandsOnParsingError() {
         for (Tuple<String, String> queryWithUnexpectedCmd : List.of(
             Tuple.tuple("frm foo", "frm"),

+ 3 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java

@@ -51,6 +51,7 @@ import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.type.DataType;
 import org.elasticsearch.xpack.ql.type.DataTypes;
 import org.elasticsearch.xpack.ql.type.EsField;
+import org.elasticsearch.xpack.ql.util.StringUtils;
 
 import java.time.Duration;
 import java.time.ZoneOffset;
@@ -72,7 +73,8 @@ public class EvalMapperTests extends ESTestCase {
         null,
         null,
         10000000,
-        10000
+        10000,
+        StringUtils.EMPTY
     );
 
     @ParametersFactory(argumentFormatting = "%1$s")

+ 2 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/FilterTests.java

@@ -20,7 +20,6 @@ import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.SerializationTestUtils;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
-import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
@@ -29,7 +28,6 @@ import org.elasticsearch.xpack.esql.parser.EsqlParser;
 import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
-import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
 import org.elasticsearch.xpack.ql.type.EsField;
@@ -42,6 +40,7 @@ import java.util.Map;
 
 import static java.util.Arrays.asList;
 import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
 import static org.elasticsearch.xpack.esql.SerializationTestUtils.assertSerialization;
 import static org.elasticsearch.xpack.ql.util.Queries.Clause.FILTER;
@@ -75,7 +74,7 @@ public class FilterTests extends ESTestCase {
 
         analyzer = new Analyzer(
             new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, EsqlTestUtils.emptyPolicyResolution()),
-            new Verifier(new Metrics())
+            TEST_VERIFIER
         );
     }
 

+ 3 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

@@ -40,6 +40,7 @@ import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.tree.Source;
 import org.elasticsearch.xpack.ql.type.DataTypes;
 import org.elasticsearch.xpack.ql.type.EsField;
+import org.elasticsearch.xpack.ql.util.StringUtils;
 import org.hamcrest.Matcher;
 import org.junit.After;
 
@@ -136,7 +137,8 @@ public class LocalExecutionPlannerTests extends MapperServiceTestCase {
             "test_cluser",
             pragmas,
             EsqlPlugin.QUERY_RESULT_TRUNCATION_MAX_SIZE.getDefault(null),
-            EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.getDefault(null)
+            EsqlPlugin.QUERY_RESULT_TRUNCATION_DEFAULT_SIZE.getDefault(null),
+            StringUtils.EMPTY
         );
     }
 

+ 3 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestTests.java

@@ -19,7 +19,6 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
 import org.elasticsearch.xpack.esql.analysis.AnalyzerContext;
-import org.elasticsearch.xpack.esql.analysis.Verifier;
 import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
 import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
 import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerContext;
@@ -28,7 +27,6 @@ import org.elasticsearch.xpack.esql.parser.EsqlParser;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.planner.Mapper;
 import org.elasticsearch.xpack.esql.session.EsqlConfigurationSerializationTests;
-import org.elasticsearch.xpack.esql.stats.Metrics;
 import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
@@ -40,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyPolicyResolution;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
 
@@ -80,7 +79,7 @@ public class DataNodeRequestTests extends AbstractWireSerializingTestCase<DataNo
         );
         DataNodeRequest request = new DataNodeRequest(
             sessionId,
-            EsqlConfigurationSerializationTests.randomConfiguration(),
+            EsqlConfigurationSerializationTests.randomConfiguration(query),
             shardIds,
             aliasFilters,
             physicalPlan
@@ -168,7 +167,7 @@ public class DataNodeRequestTests extends AbstractWireSerializingTestCase<DataNo
         var logicalOptimizer = new LogicalPlanOptimizer();
         var analyzer = new Analyzer(
             new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, emptyPolicyResolution()),
-            new Verifier(new Metrics())
+            TEST_VERIFIER
         );
         return logicalOptimizer.optimize(analyzer.analyze(new EsqlParser().createStatement(query)));
     }

+ 11 - 3
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlConfigurationSerializationTests.java

@@ -15,6 +15,8 @@ import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
 import java.io.IOException;
 
+import static org.elasticsearch.xpack.esql.session.EsqlConfiguration.QUERY_COMPRESS_THRESHOLD_CHARS;
+
 public class EsqlConfigurationSerializationTests extends AbstractWireSerializingTestCase<EsqlConfiguration> {
 
     @Override
@@ -29,6 +31,11 @@ public class EsqlConfigurationSerializationTests extends AbstractWireSerializing
     }
 
     public static EsqlConfiguration randomConfiguration() {
+        int len = randomIntBetween(1, 300) + (frequently() ? 0 : QUERY_COMPRESS_THRESHOLD_CHARS);
+        return randomConfiguration(randomRealisticUnicodeOfLength(len));
+    }
+
+    public static EsqlConfiguration randomConfiguration(String query) {
         var zoneId = randomZone();
         var locale = randomLocale(random());
         var username = randomAlphaOfLengthBetween(1, 10);
@@ -36,7 +43,7 @@ public class EsqlConfigurationSerializationTests extends AbstractWireSerializing
         var truncation = randomNonNegativeInt();
         var defaultTruncation = randomNonNegativeInt();
 
-        return new EsqlConfiguration(zoneId, locale, username, clusterName, randomQueryPragmas(), truncation, defaultTruncation);
+        return new EsqlConfiguration(zoneId, locale, username, clusterName, randomQueryPragmas(), truncation, defaultTruncation, query);
     }
 
     @Override
@@ -46,7 +53,7 @@ public class EsqlConfigurationSerializationTests extends AbstractWireSerializing
 
     @Override
     protected EsqlConfiguration mutateInstance(EsqlConfiguration in) throws IOException {
-        int ordinal = between(0, 6);
+        int ordinal = between(0, 7);
         return new EsqlConfiguration(
             ordinal == 0 ? randomValueOtherThan(in.zoneId(), () -> randomZone().normalized()) : in.zoneId(),
             ordinal == 1 ? randomValueOtherThan(in.locale(), () -> randomLocale(random())) : in.locale(),
@@ -56,7 +63,8 @@ public class EsqlConfigurationSerializationTests extends AbstractWireSerializing
                 ? new QueryPragmas(Settings.builder().put(QueryPragmas.EXCHANGE_BUFFER_SIZE.getKey(), between(1, 10)).build())
                 : in.pragmas(),
             ordinal == 5 ? in.resultTruncationMaxSize() + randomIntBetween(3, 10) : in.resultTruncationMaxSize(),
-            ordinal == 6 ? in.resultTruncationDefaultSize() + randomIntBetween(3, 10) : in.resultTruncationDefaultSize()
+            ordinal == 6 ? in.resultTruncationDefaultSize() + randomIntBetween(3, 10) : in.resultTruncationDefaultSize(),
+            ordinal == 7 ? randomAlphaOfLength(100) : in.query()
         );
     }
 }