소스 검색

SQL: ConstantProcessor can now handle NamedWriteable (#39876)

Enhance ConstantProcessor to properly serialize complex objects
(Intervals) that have their own custom serialization/deserialization
mechanism

Fix #39875
Costin Leau 6 년 전
부모
커밋
ed8a1f9340

+ 1 - 1
x-pack/plugin/sql/qa/src/main/java/org/elasticsearch/xpack/sql/qa/jdbc/SpecBaseIntegrationTestCase.java

@@ -107,7 +107,7 @@ public abstract class SpecBaseIntegrationTestCase extends JdbcIntegrationTestCas
     }
 
     protected int fetchSize() {
-        return between(1, 500);
+        return between(1, 150);
     }
 
     // TODO: use UTC for now until deciding on a strategy for handling date extraction

+ 16 - 0
x-pack/plugin/sql/qa/src/main/resources/datetime.csv-spec

@@ -346,6 +346,22 @@ SELECT YEAR(NOW() - INTERVAL 2 YEARS) / 1000 AS result;
 2
 ;
 
+dateAndIntervalPaginated
+SELECT YEAR(birth_date - INTERVAL 2 YEARS) / 1000 AS result FROM test_emp ORDER BY birth_date LIMIT 10;
+
+    result     
+---------------
+1              
+1              
+1              
+1              
+1              
+1              
+1              
+1              
+1              
+1
+;
 
 currentTimestampFilter
 SELECT first_name FROM test_emp WHERE hire_date > NOW() - INTERVAL 100 YEARS ORDER BY first_name ASC LIMIT 10; 

+ 26 - 2
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/expression/gen/processor/ConstantProcessor.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.sql.expression.gen.processor;
 
+import org.elasticsearch.common.io.stream.NamedWriteable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 
@@ -16,17 +17,40 @@ public class ConstantProcessor implements Processor {
     public static String NAME = "c";
 
     private final Object constant;
+    private final boolean namedWriteable;
+    private final Class<?> clazz;
 
     public ConstantProcessor(Object value) {
         this.constant = value;
+        this.namedWriteable = value instanceof NamedWriteable;
+        this.clazz = namedWriteable ? value.getClass() : null;
     }
 
+    @SuppressWarnings("unchecked")
     public ConstantProcessor(StreamInput in) throws IOException {
-        constant = in.readGenericValue();
+        namedWriteable = in.readBoolean();
+        if (namedWriteable) {
+            try {
+                clazz = ConstantProcessor.class.getClassLoader().loadClass(in.readString());
+            } catch (ClassNotFoundException e) {
+                throw new IOException(e);
+            }
+            constant = in.readNamedWriteable((Class<NamedWriteable>) clazz);
+        } else {
+            clazz = null;
+            constant = in.readGenericValue();
+        }
     }
 
+    @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeGenericValue(constant);
+        out.writeBoolean(namedWriteable);
+        if (namedWriteable) {
+            out.writeString(constant.getClass().getName());
+            out.writeNamedWriteable((NamedWriteable) constant);
+        } else {
+            out.writeGenericValue(constant);
+        }
     }
 
     @Override

+ 8 - 2
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/expression/gen/processor/ConstantProcessorTests.java

@@ -7,9 +7,12 @@ package org.elasticsearch.xpack.sql.expression.gen.processor;
 
 import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
-import org.elasticsearch.xpack.sql.expression.gen.processor.ConstantProcessor;
+import org.elasticsearch.xpack.sql.expression.literal.IntervalDayTime;
+import org.elasticsearch.xpack.sql.type.DataType;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
 public class ConstantProcessorTests extends AbstractWireSerializingTestCase<ConstantProcessor> {
     public static ConstantProcessor randomConstantProcessor() {
@@ -28,7 +31,10 @@ public class ConstantProcessorTests extends AbstractWireSerializingTestCase<Cons
 
     @Override
     protected ConstantProcessor mutateInstance(ConstantProcessor instance) throws IOException {
-        return new ConstantProcessor(randomValueOtherThan(instance.process(null), () -> randomAlphaOfLength(5)));
+        return new ConstantProcessor(randomValueOtherThan(instance.process(null),
+                () -> new IntervalDayTime(Duration.ofSeconds(
+                        randomLongBetween(TimeUnit.SECONDS.convert(3, TimeUnit.HOURS), TimeUnit.SECONDS.convert(23, TimeUnit.HOURS))),
+                        DataType.INTERVAL_DAY_TO_SECOND)));
     }
 
     public void testApply() {