Browse Source

SQL: Improve serialization of SQL processors (#45678)

Encapsulate the serialization/deserialization of SQL client classes.
Make configuration specific parameters (such as ZoneId) generic just
like the version and remove the need for consumer classes to manage them
individually.
This is not only consistent but also provides significant savings in the
cursor.

Fix #40216
Costin Leau 6 years ago
parent
commit
5c84479804
32 changed files with 341 additions and 128 deletions
  1. 2 2
      x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/AbstractSqlQueryRequest.java
  2. 54 0
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamInput.java
  3. 41 0
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamOutput.java
  4. 3 2
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/CompositeKeyExtractor.java
  5. 3 2
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/FieldHitExtractor.java
  6. 3 2
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/MetricAggExtractor.java
  7. 2 2
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/TopHitsAggExtractor.java
  8. 2 7
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/BaseDateTimeProcessor.java
  9. 0 1
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/DateTimeProcessor.java
  10. 0 1
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NamedDateTimeProcessor.java
  11. 0 1
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NonIsoDateTimeProcessor.java
  12. 7 2
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/QuarterProcessor.java
  13. 1 1
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java
  14. 7 3
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java
  15. 6 7
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java
  16. 32 30
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java
  17. 54 0
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlWireSerializingTestCase.java
  18. 18 20
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java
  19. 3 5
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java
  20. 13 9
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/CompositeKeyExtractorTests.java
  21. 4 4
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/ComputingExtractorTests.java
  22. 7 2
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/FieldHitExtractorTests.java
  23. 15 2
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/MetricAggExtractorTests.java
  24. 7 2
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TopHitsAggExtractorTests.java
  25. 7 2
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/DateTimeProcessorTests.java
  26. 7 2
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NamedDateTimeProcessorTests.java
  27. 7 2
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NonIsoDateTimeProcessorTests.java
  28. 7 2
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/TimeProcessorTests.java
  29. 2 3
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/gen/processor/ChainingProcessorTests.java
  30. 7 6
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java
  31. 18 0
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/CursorsTestUtil.java
  32. 2 4
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java

+ 2 - 2
x-pack/plugin/sql/sql-action/src/main/java/org/elasticsearch/xpack/sql/action/AbstractSqlQueryRequest.java

@@ -194,7 +194,7 @@ public abstract class AbstractSqlQueryRequest extends AbstractSqlRequest impleme
         super(in);
         super(in);
         query = in.readString();
         query = in.readString();
         params = in.readList(AbstractSqlQueryRequest::readSqlTypedParamValue);
         params = in.readList(AbstractSqlQueryRequest::readSqlTypedParamValue);
-        zoneId = ZoneId.of(in.readString());
+        zoneId = in.readZoneId();
         fetchSize = in.readVInt();
         fetchSize = in.readVInt();
         requestTimeout = in.readTimeValue();
         requestTimeout = in.readTimeValue();
         pageTimeout = in.readTimeValue();
         pageTimeout = in.readTimeValue();
@@ -218,7 +218,7 @@ public abstract class AbstractSqlQueryRequest extends AbstractSqlRequest impleme
         for (SqlTypedParamValue param: params) {
         for (SqlTypedParamValue param: params) {
             writeSqlTypedParamValue(out, param);
             writeSqlTypedParamValue(out, param);
         }
         }
-        out.writeString(zoneId.getId());
+        out.writeZoneId(zoneId);
         out.writeVInt(fetchSize);
         out.writeVInt(fetchSize);
         out.writeTimeValue(requestTimeout);
         out.writeTimeValue(requestTimeout);
         out.writeTimeValue(pageTimeout);
         out.writeTimeValue(pageTimeout);

+ 54 - 0
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamInput.java

@@ -0,0 +1,54 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.sql.common.io;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+
+import java.io.IOException;
+import java.time.ZoneId;
+import java.util.Base64;
+
+/**
+ * SQL-specific stream extension for {@link StreamInput} used for deserializing
+ * SQL components, especially on the client-side.
+ */
+public class SqlStreamInput extends NamedWriteableAwareStreamInput {
+
+    private final ZoneId zoneId;
+
+    public SqlStreamInput(String base64encoded, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
+        this(Base64.getDecoder().decode(base64encoded), namedWriteableRegistry, version);
+    }
+
+    public SqlStreamInput(byte[] input, NamedWriteableRegistry namedWriteableRegistry, Version version) throws IOException {
+        super(StreamInput.wrap(input), namedWriteableRegistry);
+
+        // version check first
+        Version ver = Version.readVersion(delegate);
+        if (version.compareTo(ver) != 0) {
+            throw new SqlIllegalArgumentException("Unsupported cursor version [{}], expected [{}]", ver, version);
+        }
+        delegate.setVersion(version);
+        // configuration settings
+        zoneId = delegate.readZoneId();
+    }
+
+    public ZoneId zoneId() {
+        return zoneId;
+    }
+
+    public static SqlStreamInput asSqlStream(StreamInput in) {
+        if (in instanceof SqlStreamInput) {
+            return (SqlStreamInput) in;
+        }
+        throw new SqlIllegalArgumentException("Expected SQL cursor stream, received [{}]", in.getClass());
+    }
+}

+ 41 - 0
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/common/io/SqlStreamOutput.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.sql.common.io;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.util.Base64;
+
+public class SqlStreamOutput extends OutputStreamStreamOutput {
+
+    private final ByteArrayOutputStream bytes;
+
+    public SqlStreamOutput(Version version, ZoneId zoneId) throws IOException {
+        this(new ByteArrayOutputStream(), version, zoneId);
+    }
+
+    private SqlStreamOutput(ByteArrayOutputStream bytes, Version version, ZoneId zoneId) throws IOException {
+        super(Base64.getEncoder().wrap(new OutputStreamStreamOutput(bytes)));
+        this.bytes = bytes;
+
+        Version.writeVersion(version, this);
+        writeZoneId(zoneId);
+    }
+
+    /**
+     * Should be called _after_ closing the stream - there are no guarantees otherwise.
+     */
+    public String streamAsString() {
+        // Base64 uses this encoding instead of UTF-8
+        return bytes.toString(StandardCharsets.ISO_8859_1);
+    }
+}

+ 3 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/CompositeKeyExtractor.java

@@ -9,6 +9,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
 import org.elasticsearch.xpack.sql.querydsl.container.GroupByRef.Property;
 import org.elasticsearch.xpack.sql.querydsl.container.GroupByRef.Property;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 
 
@@ -42,15 +43,15 @@ public class CompositeKeyExtractor implements BucketExtractor {
     CompositeKeyExtractor(StreamInput in) throws IOException {
     CompositeKeyExtractor(StreamInput in) throws IOException {
         key = in.readString();
         key = in.readString();
         property = in.readEnum(Property.class);
         property = in.readEnum(Property.class);
-        zoneId = ZoneId.of(in.readString());
         isDateTimeBased = in.readBoolean();
         isDateTimeBased = in.readBoolean();
+
+        zoneId = SqlStreamInput.asSqlStream(in).zoneId();
     }
     }
 
 
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(key);
         out.writeString(key);
         out.writeEnum(property);
         out.writeEnum(property);
-        out.writeString(zoneId.getId());
         out.writeBoolean(isDateTimeBased);
         out.writeBoolean(isDateTimeBased);
     }
     }
 
 

+ 3 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/FieldHitExtractor.java

@@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.mapper.IgnoredFieldMapper;
 import org.elasticsearch.index.mapper.IgnoredFieldMapper;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
 import org.elasticsearch.xpack.sql.expression.function.scalar.geo.GeoShape;
 import org.elasticsearch.xpack.sql.expression.function.scalar.geo.GeoShape;
 import org.elasticsearch.xpack.sql.type.DataType;
 import org.elasticsearch.xpack.sql.type.DataType;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
@@ -95,11 +96,12 @@ public class FieldHitExtractor implements HitExtractor {
         }
         }
         String esType = in.readOptionalString();
         String esType = in.readOptionalString();
         dataType = esType != null ? DataType.fromTypeName(esType) : null;
         dataType = esType != null ? DataType.fromTypeName(esType) : null;
-        zoneId = ZoneId.of(in.readString());
         useDocValue = in.readBoolean();
         useDocValue = in.readBoolean();
         hitName = in.readOptionalString();
         hitName = in.readOptionalString();
         arrayLeniency = in.readBoolean();
         arrayLeniency = in.readBoolean();
         path = sourcePath(fieldName, useDocValue, hitName);
         path = sourcePath(fieldName, useDocValue, hitName);
+
+        zoneId = SqlStreamInput.asSqlStream(in).zoneId();
     }
     }
 
 
     @Override
     @Override
@@ -114,7 +116,6 @@ public class FieldHitExtractor implements HitExtractor {
             out.writeOptionalString(fullFieldName);
             out.writeOptionalString(fullFieldName);
         }
         }
         out.writeOptionalString(dataType == null ? null : dataType.typeName);
         out.writeOptionalString(dataType == null ? null : dataType.typeName);
-        out.writeString(zoneId.getId());
         out.writeBoolean(useDocValue);
         out.writeBoolean(useDocValue);
         out.writeOptionalString(hitName);
         out.writeOptionalString(hitName);
         out.writeBoolean(arrayLeniency);
         out.writeBoolean(arrayLeniency);

+ 3 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/MetricAggExtractor.java

@@ -21,6 +21,7 @@ import org.elasticsearch.search.aggregations.metrics.InternalSum;
 import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks;
 import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentileRanks;
 import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
 import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
 import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
 import org.elasticsearch.xpack.sql.querydsl.agg.Aggs;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 
 
@@ -55,7 +56,8 @@ public class MetricAggExtractor implements BucketExtractor {
         property = in.readString();
         property = in.readString();
         innerKey = in.readOptionalString();
         innerKey = in.readOptionalString();
         isDateTimeBased = in.readBoolean();
         isDateTimeBased = in.readBoolean();
-        zoneId = ZoneId.of(in.readString());
+
+        zoneId = SqlStreamInput.asSqlStream(in).zoneId();
     }
     }
 
 
     @Override
     @Override
@@ -64,7 +66,6 @@ public class MetricAggExtractor implements BucketExtractor {
         out.writeString(property);
         out.writeString(property);
         out.writeOptionalString(innerKey);
         out.writeOptionalString(innerKey);
         out.writeBoolean(isDateTimeBased);
         out.writeBoolean(isDateTimeBased);
-        out.writeString(zoneId.getId());
     }
     }
 
 
     String name() {
     String name() {

+ 2 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/TopHitsAggExtractor.java

@@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
 import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
 import org.elasticsearch.xpack.sql.type.DataType;
 import org.elasticsearch.xpack.sql.type.DataType;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 
 
@@ -34,14 +35,13 @@ public class TopHitsAggExtractor implements BucketExtractor {
     TopHitsAggExtractor(StreamInput in) throws IOException {
     TopHitsAggExtractor(StreamInput in) throws IOException {
         name = in.readString();
         name = in.readString();
         fieldDataType = in.readEnum(DataType.class);
         fieldDataType = in.readEnum(DataType.class);
-        zoneId = ZoneId.of(in.readString());
+        zoneId = SqlStreamInput.asSqlStream(in).zoneId();
     }
     }
 
 
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
         out.writeString(name);
         out.writeString(name);
         out.writeEnum(fieldDataType);
         out.writeEnum(fieldDataType);
-        out.writeString(zoneId.getId());
     }
     }
 
 
     String name() {
     String name() {

+ 2 - 7
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/BaseDateTimeProcessor.java

@@ -7,8 +7,8 @@
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
 import org.elasticsearch.xpack.sql.expression.gen.processor.Processor;
 import org.elasticsearch.xpack.sql.expression.gen.processor.Processor;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -24,12 +24,7 @@ public abstract class BaseDateTimeProcessor implements Processor {
     }
     }
 
 
     BaseDateTimeProcessor(StreamInput in) throws IOException {
     BaseDateTimeProcessor(StreamInput in) throws IOException {
-        zoneId = ZoneId.of(in.readString());
-    }
-
-    @Override
-    public void writeTo(StreamOutput out) throws IOException {
-        out.writeString(zoneId.getId());
+        zoneId = SqlStreamInput.asSqlStream(in).zoneId();
     }
     }
 
 
     ZoneId zoneId() {
     ZoneId zoneId() {

+ 0 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/DateTimeProcessor.java

@@ -63,7 +63,6 @@ public class DateTimeProcessor extends BaseDateTimeProcessor {
 
 
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
-        super.writeTo(out);
         out.writeEnum(extractor);
         out.writeEnum(extractor);
     }
     }
 
 

+ 0 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NamedDateTimeProcessor.java

@@ -57,7 +57,6 @@ public class NamedDateTimeProcessor extends BaseDateTimeProcessor {
 
 
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
-        super.writeTo(out);
         out.writeEnum(extractor);
         out.writeEnum(extractor);
     }
     }
 
 

+ 0 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NonIsoDateTimeProcessor.java

@@ -72,7 +72,6 @@ public class NonIsoDateTimeProcessor extends BaseDateTimeProcessor {
 
 
     @Override
     @Override
     public void writeTo(StreamOutput out) throws IOException {
     public void writeTo(StreamOutput out) throws IOException {
-        super.writeTo(out);
         out.writeEnum(extractor);
         out.writeEnum(extractor);
     }
     }
 
 

+ 7 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/QuarterProcessor.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 
 
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.time.ZoneId;
 import java.time.ZoneId;
@@ -16,6 +17,10 @@ import java.util.Locale;
 import java.util.Objects;
 import java.util.Objects;
 
 
 public class QuarterProcessor extends BaseDateTimeProcessor {
 public class QuarterProcessor extends BaseDateTimeProcessor {
+
+    public static final String NAME = "q";
+    private static final DateTimeFormatter QUARTER_FORMAT = DateTimeFormatter.ofPattern("q", Locale.ROOT);
+
     
     
     public QuarterProcessor(ZoneId zoneId) {
     public QuarterProcessor(ZoneId zoneId) {
         super(zoneId);
         super(zoneId);
@@ -25,8 +30,8 @@ public class QuarterProcessor extends BaseDateTimeProcessor {
         super(in);
         super(in);
     }
     }
     
     
-    public static final String NAME = "q";
-    private static final DateTimeFormatter QUARTER_FORMAT = DateTimeFormatter.ofPattern("q", Locale.ROOT);
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {}
 
 
     @Override
     @Override
     public String getWriteableName() {
     public String getWriteableName() {

+ 1 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlQueryAction.java

@@ -110,7 +110,7 @@ public class RestSqlQueryAction extends BaseRestHandler {
                     final String data = textFormat.format(request, response);
                     final String data = textFormat.format(request, response);
 
 
                     restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
                     restResponse = new BytesRestResponse(RestStatus.OK, textFormat.contentType(request),
-                            data.getBytes(StandardCharsets.UTF_8));
+                        data.getBytes(StandardCharsets.UTF_8));
 
 
                     if (response.hasCursor()) {
                     if (response.hasCursor()) {
                         restResponse.addHeader("Cursor", response.cursor());
                         restResponse.addHeader("Cursor", response.cursor());

+ 7 - 3
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TextFormat.java

@@ -5,8 +5,8 @@
  */
  */
 package org.elasticsearch.xpack.sql.plugin;
 package org.elasticsearch.xpack.sql.plugin;
 
 
-import org.elasticsearch.Version;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.action.BasicFormatter;
 import org.elasticsearch.xpack.sql.action.BasicFormatter;
@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.sql.session.Cursors;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.StringUtils;
 import org.elasticsearch.xpack.sql.util.StringUtils;
 
 
+import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.ZonedDateTime;
 import java.util.List;
 import java.util.List;
 import java.util.Locale;
 import java.util.Locale;
@@ -43,10 +44,13 @@ enum TextFormat {
         String format(RestRequest request, SqlQueryResponse response) {
         String format(RestRequest request, SqlQueryResponse response) {
             BasicFormatter formatter = null;
             BasicFormatter formatter = null;
             Cursor cursor = null;
             Cursor cursor = null;
+            ZoneId zoneId = null;
 
 
             // check if the cursor is already wrapped first
             // check if the cursor is already wrapped first
             if (response.hasCursor()) {
             if (response.hasCursor()) {
-                cursor = Cursors.decodeFromString(response.cursor());
+                Tuple<Cursor, ZoneId> tuple = Cursors.decodeFromStringWithZone(response.cursor());
+                cursor = tuple.v1();
+                zoneId = tuple.v2();
                 if (cursor instanceof TextFormatterCursor) {
                 if (cursor instanceof TextFormatterCursor) {
                     formatter = ((TextFormatterCursor) cursor).getFormatter();
                     formatter = ((TextFormatterCursor) cursor).getFormatter();
                 }
                 }
@@ -58,7 +62,7 @@ enum TextFormat {
                 formatter = new BasicFormatter(response.columns(), response.rows(), TEXT);
                 formatter = new BasicFormatter(response.columns(), response.rows(), TEXT);
                 // if there's a cursor, wrap the formatter in it
                 // if there's a cursor, wrap the formatter in it
                 if (cursor != null) {
                 if (cursor != null) {
-                    response.cursor(Cursors.encodeToString(Version.CURRENT, new TextFormatterCursor(cursor, formatter)));
+                    response.cursor(Cursors.encodeToString(new TextFormatterCursor(cursor, formatter), zoneId));
                 }
                 }
                 // format with header
                 // format with header
                 return formatter.formatWithHeader(response.columns(), response.rows());
                 return formatter.formatWithHeader(response.columns(), response.rows());

+ 6 - 7
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/plugin/TransportSqlQueryAction.java

@@ -5,7 +5,6 @@
  */
  */
 package org.elasticsearch.xpack.sql.plugin;
 package org.elasticsearch.xpack.sql.plugin;
 
 
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.HandledTransportAction;
@@ -81,7 +80,7 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
                     wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
                     wrap(p -> listener.onResponse(createResponseWithSchema(request, p)), listener::onFailure));
         } else {
         } else {
             planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
             planExecutor.nextPage(cfg, Cursors.decodeFromString(request.cursor()),
-                    wrap(p -> listener.onResponse(createResponse(request.mode(), request.columnar(), null, p)),
+                    wrap(p -> listener.onResponse(createResponse(request, null, p)),
                             listener::onFailure));
                             listener::onFailure));
         }
         }
     }
     }
@@ -102,10 +101,10 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
             }
             }
         }
         }
         columns = unmodifiableList(columns);
         columns = unmodifiableList(columns);
-        return createResponse(request.mode(), request.columnar(), columns, page);
+        return createResponse(request, columns, page);
     }
     }
 
 
-    static SqlQueryResponse createResponse(Mode mode, boolean columnar, List<ColumnInfo> header, Page page) {
+    static SqlQueryResponse createResponse(SqlQueryRequest request, List<ColumnInfo> header, Page page) {
         List<List<Object>> rows = new ArrayList<>();
         List<List<Object>> rows = new ArrayList<>();
         page.rowSet().forEachRow(rowView -> {
         page.rowSet().forEachRow(rowView -> {
             List<Object> row = new ArrayList<>(rowView.columnCount());
             List<Object> row = new ArrayList<>(rowView.columnCount());
@@ -114,9 +113,9 @@ public class TransportSqlQueryAction extends HandledTransportAction<SqlQueryRequ
         });
         });
 
 
         return new SqlQueryResponse(
         return new SqlQueryResponse(
-                Cursors.encodeToString(Version.CURRENT, page.next()),
-                mode,
-                columnar,
+                Cursors.encodeToString(page.next(), request.zoneId()),
+                request.mode(),
+                request.columnar(),
                 header,
                 header,
                 rows);
                 rows);
     }
     }

+ 32 - 30
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/Cursors.java

@@ -6,13 +6,12 @@
 package org.elasticsearch.xpack.sql.session;
 package org.elasticsearch.xpack.sql.session;
 
 
 import org.elasticsearch.Version;
 import org.elasticsearch.Version;
+import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.io.stream.NamedWriteable;
-import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
-import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamOutput;
 import org.elasticsearch.xpack.sql.execution.search.CompositeAggregationCursor;
 import org.elasticsearch.xpack.sql.execution.search.CompositeAggregationCursor;
 import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
 import org.elasticsearch.xpack.sql.execution.search.ScrollCursor;
 import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractors;
 import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractors;
@@ -22,11 +21,9 @@ import org.elasticsearch.xpack.sql.expression.literal.Literals;
 import org.elasticsearch.xpack.sql.plugin.TextFormatterCursor;
 import org.elasticsearch.xpack.sql.plugin.TextFormatterCursor;
 import org.elasticsearch.xpack.sql.util.StringUtils;
 import org.elasticsearch.xpack.sql.util.StringUtils;
 
 
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
+import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.List;
 import java.util.List;
 
 
 /**
 /**
@@ -35,6 +32,7 @@ import java.util.List;
 public final class Cursors {
 public final class Cursors {
 
 
     private static final NamedWriteableRegistry WRITEABLE_REGISTRY = new NamedWriteableRegistry(getNamedWriteables());
     private static final NamedWriteableRegistry WRITEABLE_REGISTRY = new NamedWriteableRegistry(getNamedWriteables());
+    private static final Version VERSION = Version.CURRENT;
 
 
     private Cursors() {}
     private Cursors() {}
 
 
@@ -65,17 +63,20 @@ public final class Cursors {
     /**
     /**
      * Write a {@linkplain Cursor} to a string for serialization across xcontent.
      * Write a {@linkplain Cursor} to a string for serialization across xcontent.
      */
      */
-    public static String encodeToString(Version version, Cursor info) {
+    public static String encodeToString(Cursor info, ZoneId zoneId) {
+        return encodeToString(info, VERSION, zoneId);
+    }
+
+    static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
         if (info == Cursor.EMPTY) {
         if (info == Cursor.EMPTY) {
             return StringUtils.EMPTY;
             return StringUtils.EMPTY;
         }
         }
-        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
-            try (OutputStream base64 = Base64.getEncoder().wrap(os); StreamOutput out = new OutputStreamStreamOutput(base64)) {
-                Version.writeVersion(version, out);
-                out.writeNamedWriteable(info);
-            }
-            return os.toString(StandardCharsets.UTF_8.name());
-        } catch (Exception ex) {
+        try (SqlStreamOutput output = new SqlStreamOutput(version, zoneId)) {
+            output.writeNamedWriteable(info);
+            output.close();
+            // return the string only after closing the resource
+            return output.streamAsString();
+        } catch (IOException ex) {
             throw new SqlIllegalArgumentException("Unexpected failure retrieving next page", ex);
             throw new SqlIllegalArgumentException("Unexpected failure retrieving next page", ex);
         }
         }
     }
     }
@@ -84,22 +85,23 @@ public final class Cursors {
     /**
     /**
      * Read a {@linkplain Cursor} from a string.
      * Read a {@linkplain Cursor} from a string.
      */
      */
-    public static Cursor decodeFromString(String info) {
-        if (info.isEmpty()) {
-            return Cursor.EMPTY;
+    public static Cursor decodeFromString(String base64) {
+        return decodeFromStringWithZone(base64).v1();
+    }
+
+    /**
+     * Read a {@linkplain Cursor} from a string.
+     */
+    public static Tuple<Cursor, ZoneId> decodeFromStringWithZone(String base64) {
+        if (base64.isEmpty()) {
+            return new Tuple<>(Cursor.EMPTY, null);
         }
         }
-        byte[] bytes = info.getBytes(StandardCharsets.UTF_8);
-        try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(Base64.getDecoder().decode(bytes)), WRITEABLE_REGISTRY)) {
-            Version version = Version.readVersion(in);
-            if (version.after(Version.CURRENT)) {
-                throw new SqlIllegalArgumentException("Unsupported cursor version " + version);
-            }
-            in.setVersion(version);
-            return in.readNamedWriteable(Cursor.class);
-        } catch (SqlIllegalArgumentException ex) {
-            throw ex;
-        } catch (Exception ex) {
+        try (SqlStreamInput in = new SqlStreamInput(base64, WRITEABLE_REGISTRY, VERSION)) {
+            Cursor cursor = in.readNamedWriteable(Cursor.class);
+            return new Tuple<>(cursor, in.zoneId());
+        } catch (IOException ex) {
             throw new SqlIllegalArgumentException("Unexpected failure decoding cursor", ex);
             throw new SqlIllegalArgumentException("Unexpected failure decoding cursor", ex);
         }
         }
     }
     }
+
 }
 }

+ 54 - 0
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/AbstractSqlWireSerializingTestCase.java

@@ -0,0 +1,54 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.sql;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireTestCase;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamInput;
+import org.elasticsearch.xpack.sql.common.io.SqlStreamOutput;
+import org.elasticsearch.xpack.sql.session.Cursors;
+
+import java.io.IOException;
+import java.time.ZoneId;
+
+public abstract class AbstractSqlWireSerializingTestCase<T extends Writeable> extends AbstractWireTestCase<T> {
+
+    @Override
+    protected T copyInstance(T instance, Version version) throws IOException {
+        try (BytesStreamOutput output = new BytesStreamOutput()) {
+            ZoneId zoneId = instanceZoneId(instance);
+            SqlStreamOutput out = new SqlStreamOutput(version, zoneId);
+            instance.writeTo(out);
+            out.close();
+            try (SqlStreamInput in = new SqlStreamInput(out.streamAsString(), getNamedWriteableRegistry(), version)) {
+                return instanceReader().read(in);
+            }
+        }
+    }
+
+    protected ZoneId instanceZoneId(T instance) {
+        return randomSafeZone();
+    }
+
+    @Override
+    protected NamedWriteableRegistry getNamedWriteableRegistry() {
+        return new NamedWriteableRegistry(Cursors.getNamedWriteables());
+    }
+
+
+    /**
+     * We need to exclude SystemV/* time zones because they cannot be converted
+     * back to DateTimeZone which we currently still need to do internally,
+     * e.g. in bwc serialization and in the extract() method
+     */
+    protected static ZoneId randomSafeZone() {
+        return randomValueOtherThanMany(zi -> zi.getId().startsWith("SystemV"), () -> randomZone());
+    }
+}

+ 18 - 20
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/CompositeAggregationCursorTests.java

@@ -5,39 +5,38 @@
  */
  */
 package org.elasticsearch.xpack.sql.execution.search;
 package org.elasticsearch.xpack.sql.execution.search;
 
 
-import org.elasticsearch.Version;
-import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor;
 import org.elasticsearch.xpack.sql.execution.search.extractor.BucketExtractor;
 import org.elasticsearch.xpack.sql.execution.search.extractor.CompositeKeyExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.CompositeKeyExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.MetricAggExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.MetricAggExtractorTests;
-import org.elasticsearch.xpack.sql.session.Cursors;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.BitSet;
 import java.util.List;
 import java.util.List;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
-public class CompositeAggregationCursorTests extends AbstractWireSerializingTestCase<CompositeAggregationCursor> {
+public class CompositeAggregationCursorTests extends AbstractSqlWireSerializingTestCase<CompositeAggregationCursor> {
     public static CompositeAggregationCursor randomCompositeCursor() {
     public static CompositeAggregationCursor randomCompositeCursor() {
         int extractorsSize = between(1, 20);
         int extractorsSize = between(1, 20);
+        ZoneId id = randomSafeZone();
         List<BucketExtractor> extractors = new ArrayList<>(extractorsSize);
         List<BucketExtractor> extractors = new ArrayList<>(extractorsSize);
         for (int i = 0; i < extractorsSize; i++) {
         for (int i = 0; i < extractorsSize; i++) {
-            extractors.add(randomBucketExtractor());
+            extractors.add(randomBucketExtractor(id));
         }
         }
 
 
         return new CompositeAggregationCursor(new byte[randomInt(256)], extractors, randomBitSet(extractorsSize),
         return new CompositeAggregationCursor(new byte[randomInt(256)], extractors, randomBitSet(extractorsSize),
                 randomIntBetween(10, 1024), randomBoolean(), randomAlphaOfLength(5));
                 randomIntBetween(10, 1024), randomBoolean(), randomAlphaOfLength(5));
     }
     }
 
 
-    static BucketExtractor randomBucketExtractor() {
+    static BucketExtractor randomBucketExtractor(ZoneId zoneId) {
         List<Supplier<BucketExtractor>> options = new ArrayList<>();
         List<Supplier<BucketExtractor>> options = new ArrayList<>();
         options.add(ConstantExtractorTests::randomConstantExtractor);
         options.add(ConstantExtractorTests::randomConstantExtractor);
-        options.add(MetricAggExtractorTests::randomMetricAggExtractor);
-        options.add(CompositeKeyExtractorTests::randomCompositeKeyExtractor);
+        options.add(() -> MetricAggExtractorTests.randomMetricAggExtractor(zoneId));
+        options.add(() -> CompositeKeyExtractorTests.randomCompositeKeyExtractor(zoneId));
         return randomFrom(options).get();
         return randomFrom(options).get();
     }
     }
 
 
@@ -50,11 +49,6 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest
                 instance.indices());
                 instance.indices());
     }
     }
 
 
-    @Override
-    protected NamedWriteableRegistry getNamedWriteableRegistry() {
-        return new NamedWriteableRegistry(Cursors.getNamedWriteables());
-    }
-
     @Override
     @Override
     protected CompositeAggregationCursor createTestInstance() {
     protected CompositeAggregationCursor createTestInstance() {
         return randomCompositeCursor();
         return randomCompositeCursor();
@@ -66,13 +60,17 @@ public class CompositeAggregationCursorTests extends AbstractWireSerializingTest
     }
     }
 
 
     @Override
     @Override
-    protected CompositeAggregationCursor copyInstance(CompositeAggregationCursor instance, Version version) throws IOException {
-        /* Randomly choose between internal protocol round trip and String based
-         * round trips used to toXContent. */
-        if (randomBoolean()) {
-            return super.copyInstance(instance, version);
+    protected ZoneId instanceZoneId(CompositeAggregationCursor instance) {
+        List<BucketExtractor> extractors = instance.extractors();
+        for (BucketExtractor bucketExtractor : extractors) {
+            ZoneId zoneId = MetricAggExtractorTests.extractZoneId(bucketExtractor);
+            zoneId = zoneId == null ? CompositeKeyExtractorTests.extractZoneId(bucketExtractor) : zoneId;
+
+            if (zoneId != null) {
+                return zoneId;
+            }
         }
         }
-        return (CompositeAggregationCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
+        return randomSafeZone();
     }
     }
 
 
     static BitSet randomBitSet(int size) {
     static BitSet randomBitSet(int size) {

+ 3 - 5
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursorTests.java

@@ -8,10 +8,9 @@ package org.elasticsearch.xpack.sql.execution.search;
 import org.elasticsearch.Version;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.ConstantExtractorTests;
-import org.elasticsearch.xpack.sql.execution.search.extractor.FieldHitExtractorTests;
 import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
 import org.elasticsearch.xpack.sql.execution.search.extractor.HitExtractor;
 import org.elasticsearch.xpack.sql.session.Cursors;
 import org.elasticsearch.xpack.sql.session.Cursors;
 
 
@@ -20,7 +19,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
-public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCursor> {
+public class ScrollCursorTests extends AbstractSqlWireSerializingTestCase<ScrollCursor> {
     public static ScrollCursor randomScrollCursor() {
     public static ScrollCursor randomScrollCursor() {
         int extractorsSize = between(1, 20);
         int extractorsSize = between(1, 20);
         List<HitExtractor> extractors = new ArrayList<>(extractorsSize);
         List<HitExtractor> extractors = new ArrayList<>(extractorsSize);
@@ -37,7 +36,6 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCur
             options.add(() -> ComputingExtractorTests.randomComputingExtractor());
             options.add(() -> ComputingExtractorTests.randomComputingExtractor());
         }
         }
         options.add(ConstantExtractorTests::randomConstantExtractor);
         options.add(ConstantExtractorTests::randomConstantExtractor);
-        options.add(FieldHitExtractorTests::randomFieldHitExtractor);
         return randomFrom(options).get();
         return randomFrom(options).get();
     }
     }
 
 
@@ -70,6 +68,6 @@ public class ScrollCursorTests extends AbstractWireSerializingTestCase<ScrollCur
         if (randomBoolean()) {
         if (randomBoolean()) {
             return super.copyInstance(instance, version);
             return super.copyInstance(instance, version);
         }
         }
-        return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
+        return (ScrollCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
     }
     }
 }
 }

+ 13 - 9
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/CompositeKeyExtractorTests.java

@@ -8,8 +8,8 @@ package org.elasticsearch.xpack.sql.execution.search.extractor;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.querydsl.container.GroupByRef.Property;
 import org.elasticsearch.xpack.sql.querydsl.container.GroupByRef.Property;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
@@ -22,12 +22,16 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
 import static java.util.Collections.singletonMap;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 
 
-public class CompositeKeyExtractorTests extends AbstractWireSerializingTestCase<CompositeKeyExtractor> {
+public class CompositeKeyExtractorTests extends AbstractSqlWireSerializingTestCase<CompositeKeyExtractor> {
 
 
     public static CompositeKeyExtractor randomCompositeKeyExtractor() {
     public static CompositeKeyExtractor randomCompositeKeyExtractor() {
         return new CompositeKeyExtractor(randomAlphaOfLength(16), randomFrom(asList(Property.values())), randomSafeZone(), randomBoolean());
         return new CompositeKeyExtractor(randomAlphaOfLength(16), randomFrom(asList(Property.values())), randomSafeZone(), randomBoolean());
     }
     }
 
 
+    public static CompositeKeyExtractor randomCompositeKeyExtractor(ZoneId zoneId) {
+        return new CompositeKeyExtractor(randomAlphaOfLength(16), randomFrom(asList(Property.values())), zoneId, randomBoolean());
+    }
+
     @Override
     @Override
     protected CompositeKeyExtractor createTestInstance() {
     protected CompositeKeyExtractor createTestInstance() {
         return randomCompositeKeyExtractor();
         return randomCompositeKeyExtractor();
@@ -38,6 +42,11 @@ public class CompositeKeyExtractorTests extends AbstractWireSerializingTestCase<
         return CompositeKeyExtractor::new;
         return CompositeKeyExtractor::new;
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(CompositeKeyExtractor instance) {
+        return instance.zoneId();
+    }
+
     @Override
     @Override
     protected CompositeKeyExtractor mutateInstance(CompositeKeyExtractor instance) {
     protected CompositeKeyExtractor mutateInstance(CompositeKeyExtractor instance) {
         return new CompositeKeyExtractor(
         return new CompositeKeyExtractor(
@@ -79,12 +88,7 @@ public class CompositeKeyExtractorTests extends AbstractWireSerializingTestCase<
         assertEquals("Invalid date key returned: " + value, exception.getMessage());
         assertEquals("Invalid date key returned: " + value, exception.getMessage());
     }
     }
 
 
-    /**
-     * We need to exclude SystemV/* time zones because they cannot be converted
-     * back to DateTimeZone which we currently still need to do internally,
-     * e.g. in bwc serialization and in the extract() method
-     */
-    private static ZoneId randomSafeZone() {
-        return randomValueOtherThanMany(zi -> zi.getId().startsWith("SystemV"), () -> randomZone());
+    public static ZoneId extractZoneId(BucketExtractor extractor) {
+        return extractor instanceof CompositeKeyExtractor ? ((CompositeKeyExtractor) extractor).zoneId() : null;
     }
     }
 }
 }

+ 4 - 4
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/ComputingExtractorTests.java

@@ -9,10 +9,10 @@ import org.elasticsearch.common.document.DocumentField;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.expression.function.scalar.CastProcessorTests;
 import org.elasticsearch.xpack.sql.expression.function.scalar.CastProcessorTests;
 import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
 import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
-import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessorTests;
+import org.elasticsearch.xpack.sql.expression.function.scalar.math.BinaryMathProcessorTests;
 import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathFunctionProcessorTests;
 import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathFunctionProcessorTests;
 import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor;
 import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor;
 import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor.MathOperation;
 import org.elasticsearch.xpack.sql.expression.function.scalar.math.MathProcessor.MathOperation;
@@ -32,7 +32,7 @@ import static java.util.Collections.singletonMap;
 import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
 import static org.elasticsearch.xpack.sql.util.CollectionUtils.combine;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 
 
-public class ComputingExtractorTests extends AbstractWireSerializingTestCase<ComputingExtractor> {
+public class ComputingExtractorTests extends AbstractSqlWireSerializingTestCase<ComputingExtractor> {
     public static ComputingExtractor randomComputingExtractor() {
     public static ComputingExtractor randomComputingExtractor() {
         return new ComputingExtractor(randomProcessor(), randomAlphaOfLength(10));
         return new ComputingExtractor(randomProcessor(), randomAlphaOfLength(10));
     }
     }
@@ -41,8 +41,8 @@ public class ComputingExtractorTests extends AbstractWireSerializingTestCase<Com
         List<Supplier<Processor>> options = new ArrayList<>();
         List<Supplier<Processor>> options = new ArrayList<>();
         options.add(() -> ChainingProcessorTests.randomComposeProcessor());
         options.add(() -> ChainingProcessorTests.randomComposeProcessor());
         options.add(CastProcessorTests::randomCastProcessor);
         options.add(CastProcessorTests::randomCastProcessor);
-        options.add(DateTimeProcessorTests::randomDateTimeProcessor);
         options.add(MathFunctionProcessorTests::randomMathFunctionProcessor);
         options.add(MathFunctionProcessorTests::randomMathFunctionProcessor);
+        options.add(BinaryMathProcessorTests::randomProcessor);
         return randomFrom(options).get();
         return randomFrom(options).get();
     }
     }
 
 

+ 7 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/FieldHitExtractorTests.java

@@ -11,8 +11,8 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.expression.function.scalar.geo.GeoShape;
 import org.elasticsearch.xpack.sql.expression.function.scalar.geo.GeoShape;
 import org.elasticsearch.xpack.sql.type.DataType;
 import org.elasticsearch.xpack.sql.type.DataType;
@@ -37,7 +37,7 @@ import static java.util.Collections.singletonMap;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.is;
 
 
-public class FieldHitExtractorTests extends AbstractWireSerializingTestCase<FieldHitExtractor> {
+public class FieldHitExtractorTests extends AbstractSqlWireSerializingTestCase<FieldHitExtractor> {
 
 
     public static FieldHitExtractor randomFieldHitExtractor() {
     public static FieldHitExtractor randomFieldHitExtractor() {
         String hitName = randomAlphaOfLength(5);
         String hitName = randomAlphaOfLength(5);
@@ -55,6 +55,11 @@ public class FieldHitExtractorTests extends AbstractWireSerializingTestCase<Fiel
         return FieldHitExtractor::new;
         return FieldHitExtractor::new;
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(FieldHitExtractor instance) {
+        return instance.zoneId();
+    }
+
     @Override
     @Override
     protected FieldHitExtractor mutateInstance(FieldHitExtractor instance) {
     protected FieldHitExtractor mutateInstance(FieldHitExtractor instance) {
         return new FieldHitExtractor(
         return new FieldHitExtractor(

+ 15 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/MetricAggExtractorTests.java

@@ -9,8 +9,8 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 
 
@@ -22,13 +22,17 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static java.util.Collections.singletonMap;
 
 
-public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<MetricAggExtractor> {
+public class MetricAggExtractorTests extends AbstractSqlWireSerializingTestCase<MetricAggExtractor> {
 
 
     public static MetricAggExtractor randomMetricAggExtractor() {
     public static MetricAggExtractor randomMetricAggExtractor() {
         return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16),
         return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16),
             randomZone(), randomBoolean());
             randomZone(), randomBoolean());
     }
     }
 
 
+    public static MetricAggExtractor randomMetricAggExtractor(ZoneId zoneId) {
+        return new MetricAggExtractor(randomAlphaOfLength(16), randomAlphaOfLength(16), randomAlphaOfLength(16), zoneId, randomBoolean());
+    }
+
     @Override
     @Override
     protected MetricAggExtractor createTestInstance() {
     protected MetricAggExtractor createTestInstance() {
         return randomMetricAggExtractor();
         return randomMetricAggExtractor();
@@ -39,6 +43,11 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
         return MetricAggExtractor::new;
         return MetricAggExtractor::new;
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(MetricAggExtractor instance) {
+        return instance.zoneId();
+    }
+
     @Override
     @Override
     protected MetricAggExtractor mutateInstance(MetricAggExtractor instance) throws IOException {
     protected MetricAggExtractor mutateInstance(MetricAggExtractor instance) throws IOException {
         return new MetricAggExtractor(
         return new MetricAggExtractor(
@@ -113,4 +122,8 @@ public class MetricAggExtractorTests extends AbstractWireSerializingTestCase<Met
         Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
         Bucket bucket = new TestBucket(emptyMap(), 0, new Aggregations(singletonList(agg)));
         assertEquals(DateUtils.asDateTime((long) value , zoneId), extractor.extract(bucket));
         assertEquals(DateUtils.asDateTime((long) value , zoneId), extractor.extract(bucket));
     }
     }
+
+    public static ZoneId extractZoneId(BucketExtractor extractor) {
+        return extractor instanceof MetricAggExtractor ? ((MetricAggExtractor) extractor).zoneId() : null;
+    }
 }
 }

+ 7 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/execution/search/extractor/TopHitsAggExtractorTests.java

@@ -14,8 +14,8 @@ import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
 import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
 import org.elasticsearch.search.aggregations.metrics.InternalTopHits;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.type.DataType;
 import org.elasticsearch.xpack.sql.type.DataType;
 import org.elasticsearch.xpack.sql.util.DateUtils;
 import org.elasticsearch.xpack.sql.util.DateUtils;
@@ -28,7 +28,7 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonList;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 
 
-public class TopHitsAggExtractorTests extends AbstractWireSerializingTestCase<TopHitsAggExtractor> {
+public class TopHitsAggExtractorTests extends AbstractSqlWireSerializingTestCase<TopHitsAggExtractor> {
 
 
     public static TopHitsAggExtractor randomTopHitsAggExtractor() {
     public static TopHitsAggExtractor randomTopHitsAggExtractor() {
         return new TopHitsAggExtractor(randomAlphaOfLength(16), randomFrom(DataType.values()), randomZone());
         return new TopHitsAggExtractor(randomAlphaOfLength(16), randomFrom(DataType.values()), randomZone());
@@ -44,6 +44,11 @@ public class TopHitsAggExtractorTests extends AbstractWireSerializingTestCase<To
         return TopHitsAggExtractor::new;
         return TopHitsAggExtractor::new;
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(TopHitsAggExtractor instance) {
+        return instance.zoneId();
+    }
+
     @Override
     @Override
     protected TopHitsAggExtractor mutateInstance(TopHitsAggExtractor instance) {
     protected TopHitsAggExtractor mutateInstance(TopHitsAggExtractor instance) {
         return new TopHitsAggExtractor(
         return new TopHitsAggExtractor(

+ 7 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/DateTimeProcessorTests.java

@@ -6,7 +6,7 @@
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 
 
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.SqlIllegalArgumentException;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessor.DateTimeExtractor;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessor.DateTimeExtractor;
 
 
@@ -17,7 +17,7 @@ import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.Da
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.hamcrest.Matchers.startsWith;
 import static org.hamcrest.Matchers.startsWith;
 
 
-public class DateTimeProcessorTests extends AbstractWireSerializingTestCase<DateTimeProcessor> {
+public class DateTimeProcessorTests extends AbstractSqlWireSerializingTestCase<DateTimeProcessor> {
 
 
     public static DateTimeProcessor randomDateTimeProcessor() {
     public static DateTimeProcessor randomDateTimeProcessor() {
         return new DateTimeProcessor(randomFrom(DateTimeExtractor.values()), randomZone());
         return new DateTimeProcessor(randomFrom(DateTimeExtractor.values()), randomZone());
@@ -33,6 +33,11 @@ public class DateTimeProcessorTests extends AbstractWireSerializingTestCase<Date
         return DateTimeProcessor::new;
         return DateTimeProcessor::new;
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(DateTimeProcessor instance) {
+        return instance.zoneId();
+    }
+
     @Override
     @Override
     protected DateTimeProcessor mutateInstance(DateTimeProcessor instance) {
     protected DateTimeProcessor mutateInstance(DateTimeProcessor instance) {
         DateTimeExtractor replaced = randomValueOtherThan(instance.extractor(), () -> randomFrom(DateTimeExtractor.values()));
         DateTimeExtractor replaced = randomValueOtherThan(instance.extractor(), () -> randomFrom(DateTimeExtractor.values()));

+ 7 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NamedDateTimeProcessorTests.java

@@ -8,7 +8,7 @@ package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 import org.elasticsearch.bootstrap.JavaVersion;
 import org.elasticsearch.bootstrap.JavaVersion;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.NamedDateTimeProcessor.NameExtractor;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.NamedDateTimeProcessor.NameExtractor;
 import org.junit.Assume;
 import org.junit.Assume;
 
 
@@ -18,7 +18,7 @@ import java.time.ZoneId;
 import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.dateTime;
 import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.dateTime;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 
 
-public class NamedDateTimeProcessorTests extends AbstractWireSerializingTestCase<NamedDateTimeProcessor> {
+public class NamedDateTimeProcessorTests extends AbstractSqlWireSerializingTestCase<NamedDateTimeProcessor> {
     
     
     public static NamedDateTimeProcessor randomNamedDateTimeProcessor() {
     public static NamedDateTimeProcessor randomNamedDateTimeProcessor() {
         return new NamedDateTimeProcessor(randomFrom(NameExtractor.values()), UTC);
         return new NamedDateTimeProcessor(randomFrom(NameExtractor.values()), UTC);
@@ -40,6 +40,11 @@ public class NamedDateTimeProcessorTests extends AbstractWireSerializingTestCase
         return new NamedDateTimeProcessor(replaced, UTC);
         return new NamedDateTimeProcessor(replaced, UTC);
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(NamedDateTimeProcessor instance) {
+        return instance.zoneId();
+    }
+
     public void testValidDayNamesInUTC() {
     public void testValidDayNamesInUTC() {
         assumeJava9PlusAndCompatLocaleProviderSetting();
         assumeJava9PlusAndCompatLocaleProviderSetting();
         NamedDateTimeProcessor proc = new NamedDateTimeProcessor(NameExtractor.DAY_NAME, UTC);
         NamedDateTimeProcessor proc = new NamedDateTimeProcessor(NameExtractor.DAY_NAME, UTC);

+ 7 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/NonIsoDateTimeProcessorTests.java

@@ -6,7 +6,7 @@
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 
 
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.NonIsoDateTimeProcessor.NonIsoDateTimeExtractor;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.NonIsoDateTimeProcessor.NonIsoDateTimeExtractor;
 
 
 import java.io.IOException;
 import java.io.IOException;
@@ -15,7 +15,7 @@ import java.time.ZoneId;
 import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.dateTime;
 import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.dateTime;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 
 
-public class NonIsoDateTimeProcessorTests extends AbstractWireSerializingTestCase<NonIsoDateTimeProcessor> {
+public class NonIsoDateTimeProcessorTests extends AbstractSqlWireSerializingTestCase<NonIsoDateTimeProcessor> {
     
     
 
 
     public static NonIsoDateTimeProcessor randomNonISODateTimeProcessor() {
     public static NonIsoDateTimeProcessor randomNonISODateTimeProcessor() {
@@ -38,6 +38,11 @@ public class NonIsoDateTimeProcessorTests extends AbstractWireSerializingTestCas
         return new NonIsoDateTimeProcessor(replaced, UTC);
         return new NonIsoDateTimeProcessor(replaced, UTC);
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(NonIsoDateTimeProcessor instance) {
+        return instance.zoneId();
+    }
+
     public void testNonISOWeekOfYearInUTC() {
     public void testNonISOWeekOfYearInUTC() {
         NonIsoDateTimeProcessor proc = new NonIsoDateTimeProcessor(NonIsoDateTimeExtractor.WEEK_OF_YEAR, UTC);
         NonIsoDateTimeProcessor proc = new NonIsoDateTimeProcessor(NonIsoDateTimeExtractor.WEEK_OF_YEAR, UTC);
         assertEquals(2, proc.process(dateTime(568372930000L)));  //1988-01-05T09:22:10Z[UTC]
         assertEquals(2, proc.process(dateTime(568372930000L)));  //1988-01-05T09:22:10Z[UTC]

+ 7 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/function/scalar/datetime/TimeProcessorTests.java

@@ -6,7 +6,7 @@
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 package org.elasticsearch.xpack.sql.expression.function.scalar.datetime;
 
 
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessor.DateTimeExtractor;
 import org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeProcessor.DateTimeExtractor;
 
 
 import java.time.ZoneId;
 import java.time.ZoneId;
@@ -14,7 +14,7 @@ import java.time.ZoneId;
 import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.time;
 import static org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeTestUtils.time;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 import static org.elasticsearch.xpack.sql.util.DateUtils.UTC;
 
 
-public class TimeProcessorTests extends AbstractWireSerializingTestCase<TimeProcessor> {
+public class TimeProcessorTests extends AbstractSqlWireSerializingTestCase<TimeProcessor> {
 
 
     public static TimeProcessor randomTimeProcessor() {
     public static TimeProcessor randomTimeProcessor() {
         return new TimeProcessor(randomFrom(DateTimeExtractor.values()), randomZone());
         return new TimeProcessor(randomFrom(DateTimeExtractor.values()), randomZone());
@@ -30,6 +30,11 @@ public class TimeProcessorTests extends AbstractWireSerializingTestCase<TimeProc
         return TimeProcessor::new;
         return TimeProcessor::new;
     }
     }
 
 
+    @Override
+    protected ZoneId instanceZoneId(TimeProcessor instance) {
+        return instance.zoneId();
+    }
+
     @Override
     @Override
     protected TimeProcessor mutateInstance(TimeProcessor instance) {
     protected TimeProcessor mutateInstance(TimeProcessor instance) {
         DateTimeExtractor replaced = randomValueOtherThan(instance.extractor(), () -> randomFrom(DateTimeExtractor.values()));
         DateTimeExtractor replaced = randomValueOtherThan(instance.extractor(), () -> randomFrom(DateTimeExtractor.values()));

+ 2 - 3
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/gen/processor/ChainingProcessorTests.java

@@ -7,16 +7,15 @@ package org.elasticsearch.xpack.sql.expression.gen.processor;
 
 
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.sql.AbstractSqlWireSerializingTestCase;
 import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
 import org.elasticsearch.xpack.sql.expression.function.scalar.Processors;
-import org.elasticsearch.xpack.sql.expression.gen.processor.ChainingProcessor;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
 import static org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests.randomProcessor;
 import static org.elasticsearch.xpack.sql.execution.search.extractor.ComputingExtractorTests.randomProcessor;
 
 
-public class ChainingProcessorTests extends AbstractWireSerializingTestCase<ChainingProcessor> {
+public class ChainingProcessorTests extends AbstractSqlWireSerializingTestCase<ChainingProcessor> {
     public static ChainingProcessor randomComposeProcessor() {
     public static ChainingProcessor randomComposeProcessor() {
         return new ChainingProcessor(randomProcessor(), randomProcessor());
         return new ChainingProcessor(randomProcessor(), randomProcessor());
     }
     }

+ 7 - 6
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/plugin/CursorTests.java

@@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.search.ClearScrollRequest;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.SqlException;
 import org.elasticsearch.xpack.sql.TestUtils;
 import org.elasticsearch.xpack.sql.TestUtils;
@@ -21,6 +22,7 @@ import org.elasticsearch.xpack.sql.proto.ColumnInfo;
 import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.proto.Mode;
 import org.elasticsearch.xpack.sql.session.Cursor;
 import org.elasticsearch.xpack.sql.session.Cursor;
 import org.elasticsearch.xpack.sql.session.Cursors;
 import org.elasticsearch.xpack.sql.session.Cursors;
+import org.elasticsearch.xpack.sql.session.CursorsTestUtil;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentCaptor;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -102,15 +104,14 @@ public class CursorTests extends ESTestCase {
 
 
     public void testVersionHandling() {
     public void testVersionHandling() {
         Cursor cursor = randomNonEmptyCursor();
         Cursor cursor = randomNonEmptyCursor();
-        assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(Version.CURRENT, cursor)));
+        assertEquals(cursor, Cursors.decodeFromString(Cursors.encodeToString(cursor, randomZone())));
 
 
         Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);
         Version nextMinorVersion = Version.fromId(Version.CURRENT.id + 10000);
 
 
-        String encodedWithWrongVersion = Cursors.encodeToString(nextMinorVersion, cursor);
+        String encodedWithWrongVersion = CursorsTestUtil.encodeToString(cursor, nextMinorVersion, randomZone());
         SqlException exception = expectThrows(SqlException.class, () -> Cursors.decodeFromString(encodedWithWrongVersion));
         SqlException exception = expectThrows(SqlException.class, () -> Cursors.decodeFromString(encodedWithWrongVersion));
 
 
-        assertEquals("Unsupported cursor version " + nextMinorVersion, exception.getMessage());
+        assertEquals(LoggerMessageFormat.format("Unsupported cursor version [{}], expected [{}]", nextMinorVersion, Version.CURRENT),
+                exception.getMessage());
     }
     }
-
-
-}
+}

+ 18 - 0
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/CursorsTestUtil.java

@@ -0,0 +1,18 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.sql.session;
+
+import org.elasticsearch.Version;
+
+import java.time.ZoneId;
+
+public class CursorsTestUtil {
+
+    public static String encodeToString(Cursor info, Version version, ZoneId zoneId) {
+        return Cursors.encodeToString(info, version, zoneId);
+    }
+}

+ 2 - 4
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/session/ListCursorTests.java

@@ -9,8 +9,6 @@ import org.elasticsearch.Version;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
-import org.elasticsearch.xpack.sql.session.Cursors;
-import org.elasticsearch.xpack.sql.session.ListCursor;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -32,7 +30,7 @@ public class ListCursorTests extends AbstractWireSerializingTestCase<ListCursor>
 
 
     @Override
     @Override
     protected ListCursor mutateInstance(ListCursor instance) throws IOException {
     protected ListCursor mutateInstance(ListCursor instance) throws IOException {
-        return new ListCursor(instance.data(), 
+        return new ListCursor(instance.data(),
                 randomValueOtherThan(instance.pageSize(), () -> between(1, 20)),
                 randomValueOtherThan(instance.pageSize(), () -> between(1, 20)),
                 instance.columnCount());
                 instance.columnCount());
     }
     }
@@ -59,6 +57,6 @@ public class ListCursorTests extends AbstractWireSerializingTestCase<ListCursor>
         if (randomBoolean()) {
         if (randomBoolean()) {
             return super.copyInstance(instance, version);
             return super.copyInstance(instance, version);
         }
         }
-        return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(version, instance));
+        return (ListCursor) Cursors.decodeFromString(Cursors.encodeToString(instance, randomZone()));
     }
     }
 }
 }