Browse Source

ESQL: Make `Source` `Writeable` (#109137)

We'd love to move all of the serialization from `PlanNamedTypes` into
the the types themselves. But serialization for `Source` is in ESQL
proper, not ESQL-core with all of the types that need to be serialized.
This moves that serialization. It does so by making an interface in
ESQL-core that contains the `query` itself which is required to
serialize `Source`. This seems like the simplest way to get started
moving serialization. The shim *should* be temporary, but it'll take
some time to move things around.
Nik Everett 1 year ago
parent
commit
86652bde14

+ 65 - 1
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Source.java

@@ -7,9 +7,17 @@
 
 
 package org.elasticsearch.xpack.esql.core.tree;
 package org.elasticsearch.xpack.esql.core.tree;
 
 
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
+import org.elasticsearch.xpack.esql.core.util.StringUtils;
+
+import java.io.IOException;
 import java.util.Objects;
 import java.util.Objects;
 
 
-public final class Source {
+public final class Source implements Writeable {
 
 
     public static final Source EMPTY = new Source(Location.EMPTY, "");
     public static final Source EMPTY = new Source(Location.EMPTY, "");
 
 
@@ -25,6 +33,31 @@ public final class Source {
         this.text = text;
         this.text = text;
     }
     }
 
 
+    public static <S extends StreamInput & PlanStreamInput> Source readFrom(S in) throws IOException {
+        if (in.readBoolean() == false) {
+            return EMPTY;
+        }
+        int line = in.readInt();
+        int column = in.readInt();
+        int charPositionInLine = column - 1;
+
+        int length = in.readInt();
+        String text = sourceText(in.sourceText(), line, column, length);
+        return new Source(new Location(line, charPositionInLine), text);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        if (this == EMPTY) {
+            out.writeBoolean(false);
+            return;
+        }
+        out.writeBoolean(true);
+        out.writeInt(location.getLineNumber());
+        out.writeInt(location.getColumnNumber());
+        out.writeInt(text.length());
+    }
+
     // TODO: rename to location()
     // TODO: rename to location()
     public Location source() {
     public Location source() {
         return location;
         return location;
@@ -61,4 +94,35 @@ public final class Source {
     public static Source synthetic(String text) {
     public static Source synthetic(String text) {
         return new Source(Location.EMPTY, text);
         return new Source(Location.EMPTY, text);
     }
     }
+
+    private static String sourceText(String query, int line, int column, int length) {
+        if (line <= 0 || column <= 0 || query.isEmpty()) {
+            return StringUtils.EMPTY;
+        }
+        int offset = textOffset(query, line, column);
+        if (offset + length > query.length()) {
+            throw new QlIllegalArgumentException(
+                "location [@" + line + ":" + column + "] and length [" + length + "] overrun query size [" + query.length() + "]"
+            );
+        }
+        return query.substring(offset, offset + length);
+    }
+
+    private static int textOffset(String query, int line, int column) {
+        int offset = 0;
+        if (line > 1) {
+            String[] lines = query.split("\n");
+            if (line > lines.length) {
+                throw new QlIllegalArgumentException(
+                    "line location [" + line + "] higher than max [" + lines.length + "] in query [" + query + "]"
+                );
+            }
+            for (int i = 0; i < line - 1; i++) {
+                offset += lines[i].length() + 1; // +1 accounts for the removed \n
+            }
+        }
+        offset += column - 1; // -1 since column is 1-based indexed
+        return offset;
+    }
+
 }
 }

+ 26 - 0
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/PlanStreamInput.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.core.util;
+
+import org.elasticsearch.xpack.esql.core.tree.Source;
+
+/**
+ * Interface for streams that can serialize plan components. This exists so
+ * ESQL proper can expose streaming capability to ESQL-core. If the world is kind
+ * and just we'll remove this when we flatten everything from ESQL-core into
+ * ESQL proper.
+ */
+public interface PlanStreamInput {
+    /**
+     * The query sent by the user to build this plan. This is used to rebuild
+     * {@link Source} without sending the query over the wire over and over
+     * and over again.
+     */
+    String sourceText();
+
+}

+ 15 - 61
x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/util/SourceUtils.java

@@ -9,8 +9,6 @@ package org.elasticsearch.xpack.esql.core.util;
 
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.core.Nullable;
-import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
 import org.elasticsearch.xpack.esql.core.tree.Location;
 import org.elasticsearch.xpack.esql.core.tree.Location;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.tree.Source;
 
 
@@ -20,74 +18,30 @@ public final class SourceUtils {
 
 
     private SourceUtils() {}
     private SourceUtils() {}
 
 
+    /**
+     * Write a {@link Source} including the text in it.
+     * @deprecated replace with {@link Source#writeTo}.
+     *             That's not binary compatible so the replacement is complex.
+     */
+    @Deprecated
     public static void writeSource(StreamOutput out, Source source) throws IOException {
     public static void writeSource(StreamOutput out, Source source) throws IOException {
-        writeSource(out, source, true);
-    }
-
-    public static void writeSourceNoText(StreamOutput out, Source source) throws IOException {
-        writeSource(out, source, false);
-    }
-
-    public static Source readSource(StreamInput in) throws IOException {
-        return readSource(in, null);
-    }
-
-    public static Source readSourceWithText(StreamInput in, String queryText) throws IOException {
-        return readSource(in, queryText);
-    }
-
-    private static void writeSource(StreamOutput out, Source source, boolean writeText) throws IOException {
         out.writeInt(source.source().getLineNumber());
         out.writeInt(source.source().getLineNumber());
         out.writeInt(source.source().getColumnNumber());
         out.writeInt(source.source().getColumnNumber());
-        if (writeText) {
-            out.writeString(source.text());
-        } else {
-            out.writeInt(source.text().length());
-        }
+        out.writeString(source.text());
     }
     }
 
 
-    private static Source readSource(StreamInput in, @Nullable String queryText) throws IOException {
+    /**
+     * Read a {@link Source} including the text in it.
+     * @deprecated replace with {@link Source#readFrom(StreamInput)}.
+     *             That's not binary compatible so the replacement is complex.
+     */
+    @Deprecated
+    public static Source readSource(StreamInput in) throws IOException {
         int line = in.readInt();
         int line = in.readInt();
         int column = in.readInt();
         int column = in.readInt();
         int charPositionInLine = column - 1;
         int charPositionInLine = column - 1;
 
 
-        String text;
-        if (queryText == null) {
-            text = in.readString();
-        } else {
-            int length = in.readInt();
-            text = sourceText(queryText, line, column, length);
-        }
+        String text = in.readString();
         return new Source(new Location(line, charPositionInLine), text);
         return new Source(new Location(line, charPositionInLine), text);
     }
     }
-
-    private static String sourceText(String query, int line, int column, int length) {
-        if (line <= 0 || column <= 0 || query.isEmpty()) {
-            return StringUtils.EMPTY;
-        }
-        int offset = textOffset(query, line, column);
-        if (offset + length > query.length()) {
-            throw new QlIllegalArgumentException(
-                "location [@" + line + ":" + column + "] and length [" + length + "] overrun query size [" + query.length() + "]"
-            );
-        }
-        return query.substring(offset, offset + length);
-    }
-
-    private static int textOffset(String query, int line, int column) {
-        int offset = 0;
-        if (line > 1) {
-            String[] lines = query.split("\n");
-            if (line > lines.length) {
-                throw new QlIllegalArgumentException(
-                    "line location [" + line + "] higher than max [" + lines.length + "] in query [" + query + "]"
-                );
-            }
-            for (int i = 0; i < line - 1; i++) {
-                offset += lines[i].length() + 1; // +1 accounts for the removed \n
-            }
-        }
-        offset += column - 1; // -1 since column is 1-based indexed
-        return offset;
-    }
 }
 }

File diff suppressed because it is too large
+ 147 - 133
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java


+ 8 - 9
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java

@@ -30,7 +30,6 @@ import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.NameId;
 import org.elasticsearch.xpack.esql.core.expression.NameId;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanNamedReader;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanNamedReader;
@@ -48,13 +47,13 @@ import java.util.Map;
 import java.util.function.LongFunction;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
-import static org.elasticsearch.xpack.esql.core.util.SourceUtils.readSourceWithText;
-
 /**
 /**
  * A customized stream input used to deserialize ESQL physical plan fragments. Complements stream
  * A customized stream input used to deserialize ESQL physical plan fragments. Complements stream
  * input with methods that read plan nodes, Attributes, Expressions, etc.
  * input with methods that read plan nodes, Attributes, Expressions, etc.
  */
  */
-public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
+public final class PlanStreamInput extends NamedWriteableAwareStreamInput
+    implements
+        org.elasticsearch.xpack.esql.core.util.PlanStreamInput {
 
 
     /**
     /**
      * A Mapper of stream named id, represented as a primitive long value, to NameId instance.
      * A Mapper of stream named id, represented as a primitive long value, to NameId instance.
@@ -123,11 +122,6 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
         return readOptionalNamed(PhysicalPlan.class);
         return readOptionalNamed(PhysicalPlan.class);
     }
     }
 
 
-    public Source readSource() throws IOException {
-        boolean hasSource = readBoolean();
-        return hasSource ? readSourceWithText(this, configuration.query()) : Source.EMPTY;
-    }
-
     public Expression readExpression() throws IOException {
     public Expression readExpression() throws IOException {
         return readNamed(Expression.class);
         return readNamed(Expression.class);
     }
     }
@@ -268,6 +262,11 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
         }
         }
     }
     }
 
 
+    @Override
+    public String sourceText() {
+        return configuration.query();
+    }
+
     static void throwOnNullOptionalRead(Class<?> type) throws IOException {
     static void throwOnNullOptionalRead(Class<?> type) throws IOException {
         final IOException e = new IOException("read optional named returned null which is not allowed, type:" + type);
         final IOException e = new IOException("read optional named returned null which is not allowed, type:" + type);
         assert false : e;
         assert false : e;

+ 0 - 12
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamOutput.java

@@ -23,7 +23,6 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.Expression;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
 import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.esql.core.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.esql.core.tree.Source;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanWriter;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
@@ -33,8 +32,6 @@ import java.util.IdentityHashMap;
 import java.util.Map;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Function;
 
 
-import static org.elasticsearch.xpack.esql.core.util.SourceUtils.writeSourceNoText;
-
 /**
 /**
  * A customized stream output used to serialize ESQL physical plan fragments. Complements stream
  * A customized stream output used to serialize ESQL physical plan fragments. Complements stream
  * output with methods that write plan nodes, Attributes, Expressions, etc.
  * output with methods that write plan nodes, Attributes, Expressions, etc.
@@ -98,15 +95,6 @@ public final class PlanStreamOutput extends StreamOutput {
         }
         }
     }
     }
 
 
-    public void writeSource(Source source) throws IOException {
-        writeBoolean(true);
-        writeSourceNoText(this, source);
-    }
-
-    public void writeNoSource() throws IOException {
-        writeBoolean(false);
-    }
-
     public void writeExpression(Expression expression) throws IOException {
     public void writeExpression(Expression expression) throws IOException {
         writeNamed(Expression.class, expression);
         writeNamed(Expression.class, expression);
     }
     }

+ 0 - 93
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/util/SourceUtils.java

@@ -1,93 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License
- * 2.0; you may not use this file except in compliance with the Elastic License
- * 2.0.
- */
-
-package org.elasticsearch.xpack.ql.util;
-
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.core.Nullable;
-import org.elasticsearch.xpack.ql.QlIllegalArgumentException;
-import org.elasticsearch.xpack.ql.tree.Location;
-import org.elasticsearch.xpack.ql.tree.Source;
-
-import java.io.IOException;
-
-public final class SourceUtils {
-
-    private SourceUtils() {}
-
-    public static void writeSource(StreamOutput out, Source source) throws IOException {
-        writeSource(out, source, true);
-    }
-
-    public static void writeSourceNoText(StreamOutput out, Source source) throws IOException {
-        writeSource(out, source, false);
-    }
-
-    public static Source readSource(StreamInput in) throws IOException {
-        return readSource(in, null);
-    }
-
-    public static Source readSourceWithText(StreamInput in, String queryText) throws IOException {
-        return readSource(in, queryText);
-    }
-
-    private static void writeSource(StreamOutput out, Source source, boolean writeText) throws IOException {
-        out.writeInt(source.source().getLineNumber());
-        out.writeInt(source.source().getColumnNumber());
-        if (writeText) {
-            out.writeString(source.text());
-        } else {
-            out.writeInt(source.text().length());
-        }
-    }
-
-    private static Source readSource(StreamInput in, @Nullable String queryText) throws IOException {
-        int line = in.readInt();
-        int column = in.readInt();
-        int charPositionInLine = column - 1;
-
-        String text;
-        if (queryText == null) {
-            text = in.readString();
-        } else {
-            int length = in.readInt();
-            text = sourceText(queryText, line, column, length);
-        }
-        return new Source(new Location(line, charPositionInLine), text);
-    }
-
-    private static String sourceText(String query, int line, int column, int length) {
-        if (line <= 0 || column <= 0 || query.isEmpty()) {
-            return StringUtils.EMPTY;
-        }
-        int offset = textOffset(query, line, column);
-        if (offset + length > query.length()) {
-            throw new QlIllegalArgumentException(
-                "location [@" + line + ":" + column + "] and length [" + length + "] overrun query size [" + query.length() + "]"
-            );
-        }
-        return query.substring(offset, offset + length);
-    }
-
-    private static int textOffset(String query, int line, int column) {
-        int offset = 0;
-        if (line > 1) {
-            String[] lines = query.split("\n");
-            if (line > lines.length) {
-                throw new QlIllegalArgumentException(
-                    "line location [" + line + "] higher than max [" + lines.length + "] in query [" + query + "]"
-                );
-            }
-            for (int i = 0; i < line - 1; i++) {
-                offset += lines[i].length() + 1; // +1 accounts for the removed \n
-            }
-        }
-        offset += column - 1; // -1 since column is 1-based indexed
-        return offset;
-    }
-}

Some files were not shown because too many files changed in this diff