1
0
Эх сурвалжийг харах

SQL: Fix ZonedDateTime with nanos serialisation (#68253)

Previously the `ConstantProcessor` was using the
`[read/write]GenericValue` for ZonedDateTime which is implemented in
`StreamInput` and doesn't read/write the nanos resolution of the
objects. Implement custom serialisation to include the nanos.

Follows: #67666
Marios Trivyzas 4 жил өмнө
parent
commit
47622d674a

+ 47 - 14
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/expression/gen/processor/ConstantProcessor.java

@@ -10,36 +10,69 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.Objects;
 
 public class ConstantProcessor implements Processor {
 
     public static String NAME = "c";
 
-    private final Object constant;
-    private final boolean namedWriteable;
+    private Object constant;
+    private final Type type;
+
+    enum Type {
+        NAMED_WRITABLE,
+        ZONEDDATETIME,
+        GENERIC
+    }
 
     public ConstantProcessor(Object value) {
         this.constant = value;
-        this.namedWriteable = value instanceof NamedWriteable;
+        if (value instanceof NamedWriteable) {
+            type = Type.NAMED_WRITABLE;
+        } else if (value instanceof ZonedDateTime) {
+            type = Type.ZONEDDATETIME;
+        } else {
+            type = Type.GENERIC;
+        }
     }
 
     public ConstantProcessor(StreamInput in) throws IOException {
-        namedWriteable = in.readBoolean();
-        if (namedWriteable) {
-            constant = in.readNamedWriteable(ConstantNamedWriteable.class);
-        } else {
-            constant = in.readGenericValue();
+        type = in.readEnum(Type.class);
+        switch (type) {
+            case NAMED_WRITABLE:
+                constant = in.readNamedWriteable(ConstantNamedWriteable.class);
+                break;
+            case ZONEDDATETIME:
+                ZonedDateTime zdt;
+                ZoneId zoneId = in.readZoneId();
+                zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(in.readLong()), zoneId);
+                constant = zdt.withNano(in.readInt());
+                break;
+            case GENERIC:
+                constant = in.readGenericValue();
+                break;
         }
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeBoolean(namedWriteable);
-        if (namedWriteable) {
-            out.writeNamedWriteable((NamedWriteable) constant);
-        } else {
-            out.writeGenericValue(constant);
+        out.writeEnum(type);
+        switch (type) {
+            case NAMED_WRITABLE:
+                out.writeNamedWriteable((NamedWriteable) constant);
+                break;
+            case ZONEDDATETIME:
+                ZonedDateTime zdt = (ZonedDateTime) constant;
+                out.writeZoneId(zdt.getZone());
+                out.writeLong(zdt.toInstant().toEpochMilli());
+                out.writeInt(zdt.getNano());
+                break;
+            case GENERIC:
+                out.writeGenericValue(constant);
+                break;
         }
     }
 
@@ -76,4 +109,4 @@ public class ConstantProcessor implements Processor {
     public String toString() {
         return "^" + constant;
     }
-}
+}

+ 13 - 1
x-pack/plugin/ql/src/test/java/org/elasticsearch/xpack/ql/expression/gen/processor/ConstantProcessorTests.java

@@ -9,10 +9,22 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 
 import java.io.IOException;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.ZonedDateTime;
 
 public class ConstantProcessorTests extends AbstractWireSerializingTestCase<ConstantProcessor> {
+
     public static ConstantProcessor randomConstantProcessor() {
-        return new ConstantProcessor(randomAlphaOfLength(5));
+        if (randomBoolean()) {
+            Clock clock = Clock.tickMillis(randomZone());
+            if (randomBoolean()) {
+                clock = Clock.tick(clock, Duration.ofNanos(1));
+            }
+            return new ConstantProcessor( ZonedDateTime.now(clock));
+        } else {
+            return new ConstantProcessor(randomAlphaOfLength(5));
+        }
     }
 
     @Override

+ 5 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/execution/search/extractor/TopHitsAggExtractor.java

@@ -80,7 +80,11 @@ public class TopHitsAggExtractor implements BucketExtractor {
 
         Object value = agg.getHits().getAt(0).getFields().values().iterator().next().getValue();
         if (fieldDataType == DATETIME || fieldDataType == DATE) {
-            return DateUtils.asDateTimeWithMillis(Long.parseLong(value.toString()), zoneId);
+            try {
+                return DateUtils.asDateTimeWithMillis(Long.parseLong(value.toString()), zoneId);
+            } catch (NumberFormatException e) {
+                return DateUtils.asDateTimeWithNanos(value.toString()).withZoneSameInstant(zoneId);
+            }
         } else if (fieldDataType == DATETIME_NANOS) {
             return DateUtils.asDateTimeWithNanos(value.toString());
         } else if (SqlDataTypes.isTimeBased(fieldDataType)) {