Browse Source

Implement now() function (ESQL-1172)

returns current datetime
Luigi Dell'Aquila 2 years ago
parent
commit
c2c0b0fa0d
15 changed files with 220 additions and 19 deletions
  1. 2 0
      docs/reference/esql/esql-functions.asciidoc
  2. 8 0
      docs/reference/esql/functions/now.asciidoc
  3. 17 5
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/EvaluatorImplementer.java
  4. 22 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec
  5. 1 0
      x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec
  6. 42 0
      x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/date/NowEvaluator.java
  7. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
  8. 3 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
  9. 86 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/date/Now.java
  10. 8 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java
  11. 16 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java
  12. 9 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/show/ShowFunctions.java
  13. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java
  14. 1 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java
  15. 3 2
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java

+ 2 - 0
docs/reference/esql/esql-functions.asciidoc

@@ -30,6 +30,7 @@ these functions:
 * <<esql-mv_median>>
 * <<esql-mv_min>>
 * <<esql-mv_sum>>
+* <<esql-now>>
 * <<esql-pow>>
 * <<esql-round>>
 * <<esql-split>>
@@ -64,6 +65,7 @@ include::functions/mv_max.asciidoc[]
 include::functions/mv_median.asciidoc[]
 include::functions/mv_min.asciidoc[]
 include::functions/mv_sum.asciidoc[]
+include::functions/now.asciidoc[]
 include::functions/pow.asciidoc[]
 include::functions/round.asciidoc[]
 include::functions/split.asciidoc[]

+ 8 - 0
docs/reference/esql/functions/now.asciidoc

@@ -0,0 +1,8 @@
+[[esql-now]]
+=== `NOW`
+Returns current date and time.
+
+[source,esql]
+----
+ROW current_date = NOW()
+----

+ 17 - 5
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/EvaluatorImplementer.java

@@ -80,7 +80,9 @@ public class EvaluatorImplementer {
 
         builder.addMethod(ctor());
         builder.addMethod(eval());
-        builder.addMethod(realEval(true));
+        if (processFunction.args.stream().anyMatch(x -> x instanceof FixedProcessFunctionArg == false)) {
+            builder.addMethod(realEval(true));
+        }
         builder.addMethod(realEval(false));
         builder.addMethod(toStringMethod());
         return builder.build();
@@ -108,10 +110,20 @@ public class EvaluatorImplementer {
     }
 
     private String invokeRealEval(boolean blockStyle) {
-        return "return eval(page.getPositionCount(), "
-            + processFunction.args.stream().map(a -> a.paramName(blockStyle)).filter(a -> a != null).collect(Collectors.joining(", "))
-            + ")"
-            + (processFunction.resultDataType(blockStyle).simpleName().endsWith("Vector") ? ".asBlock()" : "");
+        StringBuilder builder = new StringBuilder("return eval(page.getPositionCount()");
+        String params = processFunction.args.stream()
+            .map(a -> a.paramName(blockStyle))
+            .filter(a -> a != null)
+            .collect(Collectors.joining(", "));
+        if (params.length() > 0) {
+            builder.append(", ");
+            builder.append(params);
+        }
+        builder.append(")");
+        if (processFunction.resultDataType(blockStyle).simpleName().endsWith("Vector")) {
+            builder.append(".asBlock()");
+        }
+        return builder.toString();
     }
 
     private MethodSpec realEval(boolean blockStyle) {

+ 22 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/date.csv-spec

@@ -281,6 +281,28 @@ hire_date:date           | hd:date
 1985-11-20T00:00:00.000Z | 1985-11-18T00:00:00.000Z
 ;
 
+now
+row a = now() | eval x = a == now(), y = substring(date_format(a, "yyyy"), 0, 2) | keep x, y;
+
+x:boolean  | y:keyword
+true       | 20
+; 
+
+born_before_today
+from employees | where birth_date < now() | sort emp_no asc | keep emp_no, birth_date| limit 1;
+
+emp_no:integer  | birth_date:date
+10001           | 1953-09-02T00:00:00Z
+;
+
+
+born_after_today
+from employees | where birth_date > now() | sort emp_no asc | keep emp_no, birth_date| limit 1;
+
+emp_no:integer  | birth_date:date
+;
+
+
 autoBucketMonthInAgg
 // tag::auto_bucket_in_agg[]
 FROM employees

+ 1 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/resources/show.csv-spec

@@ -37,6 +37,7 @@ mv_max                   |mv_max(arg1)
 mv_median                |mv_median(arg1)
 mv_min                   |mv_min(arg1)
 mv_sum                   |mv_sum(arg1)
+now                      |now()
 percentile               |percentile(arg1, arg2)
 pow                      |pow(arg1, arg2)
 round                    |round(arg1, arg2)

+ 42 - 0
x-pack/plugin/esql/src/main/java/generated/org/elasticsearch/xpack/esql/expression/function/scalar/date/NowEvaluator.java

@@ -0,0 +1,42 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.date;
+
+import java.lang.Override;
+import java.lang.String;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.LongVector;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.EvalOperator;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Now}.
+ * This class is generated. Do not edit it.
+ */
+public final class NowEvaluator implements EvalOperator.ExpressionEvaluator {
+  private final long now;
+
+  public NowEvaluator(long now) {
+    this.now = now;
+  }
+
+  @Override
+  public Block eval(Page page) {
+    return eval(page.getPositionCount()).asBlock();
+  }
+
+  public LongVector eval(int positionCount) {
+    LongVector.Builder result = LongVector.newVectorBuilder(positionCount);
+    position: for (int p = 0; p < positionCount; p++) {
+      result.appendLong(Now.process(now));
+    }
+    return result.build();
+  }
+
+  @Override
+  public String toString() {
+    return "NowEvaluator[" + "now=" + now + "]";
+  }
+}

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java

@@ -277,7 +277,7 @@ public class EnrichLookupService {
             this.matchType = in.readString();
             this.matchField = in.readString();
             this.inputPage = new Page(in);
-            PlanStreamInput planIn = new PlanStreamInput(in, PlanNameRegistry.INSTANCE, in.namedWriteableRegistry());
+            PlanStreamInput planIn = new PlanStreamInput(in, PlanNameRegistry.INSTANCE, in.namedWriteableRegistry(), null);
             this.extractFields = planIn.readList(readerFromPlanReader(PlanStreamInput::readNamedExpression));
         }
 

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java

@@ -29,6 +29,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToVersion
 import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateFormat;
 import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateParse;
 import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc;
+import org.elasticsearch.xpack.esql.expression.function.scalar.date.Now;
 import org.elasticsearch.xpack.esql.expression.function.scalar.ip.CIDRMatch;
 import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs;
 import org.elasticsearch.xpack.esql.expression.function.scalar.math.AutoBucket;
@@ -99,7 +100,8 @@ public class EsqlFunctionRegistry extends FunctionRegistry {
             new FunctionDefinition[] {
                 def(DateFormat.class, DateFormat::new, "date_format"),
                 def(DateParse.class, DateParse::new, "date_parse"),
-                def(DateTrunc.class, DateTrunc::new, "date_trunc"), },
+                def(DateTrunc.class, DateTrunc::new, "date_trunc"),
+                def(Now.class, Now::new, "now") },
             // conditional
             new FunctionDefinition[] { def(Case.class, Case::new, "case"), def(IsNull.class, IsNull::new, "is_null"), },
             // IP

+ 86 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/date/Now.java

@@ -0,0 +1,86 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.date;
+
+import org.elasticsearch.compute.ann.Evaluator;
+import org.elasticsearch.compute.ann.Fixed;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.xpack.esql.planner.Mappable;
+import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.expression.function.scalar.ConfigurationFunction;
+import org.elasticsearch.xpack.ql.expression.gen.script.ScriptTemplate;
+import org.elasticsearch.xpack.ql.session.Configuration;
+import org.elasticsearch.xpack.ql.tree.NodeInfo;
+import org.elasticsearch.xpack.ql.tree.Source;
+import org.elasticsearch.xpack.ql.type.DataType;
+import org.elasticsearch.xpack.ql.type.DataTypes;
+
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class Now extends ConfigurationFunction implements Mappable {
+
+    private final long now;
+
+    public Now(Source source, Configuration configuration) {
+        super(source, List.of(), configuration);
+        this.now = configuration.now() == null ? System.currentTimeMillis() : configuration.now().toInstant().toEpochMilli();
+    }
+
+    private Now(Source source, long now) {
+        super(source, List.of(), null);
+        this.now = now;
+    }
+
+    public static Now newInstance(Source source, long now) {
+        return new Now(source, now);
+    }
+
+    @Override
+    public Object fold() {
+        return now;
+    }
+
+    @Override
+    public boolean foldable() {
+        return true;
+    }
+
+    @Override
+    public DataType dataType() {
+        return DataTypes.DATETIME;
+    }
+
+    @Evaluator
+    static long process(@Fixed long now) {
+        return now;
+    }
+
+    @Override
+    public Expression replaceChildren(List<Expression> newChildren) {
+        return this;
+    }
+
+    @Override
+    protected NodeInfo<? extends Expression> info() {
+        return NodeInfo.create(this, Now::new, configuration());
+    }
+
+    @Override
+    public Supplier<EvalOperator.ExpressionEvaluator> toEvaluator(
+        Function<Expression, Supplier<EvalOperator.ExpressionEvaluator>> toEvaluator
+    ) {
+        return () -> new NowEvaluator(now);
+    }
+
+    @Override
+    public ScriptTemplate asScript() {
+        throw new UnsupportedOperationException();
+    }
+}

+ 8 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypes.java

@@ -39,6 +39,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToVersion
 import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateFormat;
 import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateParse;
 import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc;
+import org.elasticsearch.xpack.esql.expression.function.scalar.date.Now;
 import org.elasticsearch.xpack.esql.expression.function.scalar.ip.CIDRMatch;
 import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs;
 import org.elasticsearch.xpack.esql.expression.function.scalar.math.AutoBucket;
@@ -278,6 +279,7 @@ public final class PlanNamedTypes {
             of(ScalarFunction.class, DateFormat.class, PlanNamedTypes::writeDateFormat, PlanNamedTypes::readDateFormat),
             of(ScalarFunction.class, DateParse.class, PlanNamedTypes::writeDateTimeParse, PlanNamedTypes::readDateTimeParse),
             of(ScalarFunction.class, DateTrunc.class, PlanNamedTypes::writeDateTrunc, PlanNamedTypes::readDateTrunc),
+            of(ScalarFunction.class, Now.class, PlanNamedTypes::writeNow, PlanNamedTypes::readNow),
             of(ScalarFunction.class, Round.class, PlanNamedTypes::writeRound, PlanNamedTypes::readRound),
             of(ScalarFunction.class, Pow.class, PlanNamedTypes::writePow, PlanNamedTypes::readPow),
             of(ScalarFunction.class, StartsWith.class, PlanNamedTypes::writeStartsWith, PlanNamedTypes::readStartsWith),
@@ -1040,6 +1042,12 @@ public final class PlanNamedTypes {
         out.writeExpression(fields.get(1));
     }
 
+    static Now readNow(PlanStreamInput in) throws IOException {
+        return new Now(Source.EMPTY, in.configuration());
+    }
+
+    static void writeNow(PlanStreamOutput out, Now function) {}
+
     static Round readRound(PlanStreamInput in) throws IOException {
         return new Round(Source.EMPTY, in.readExpression(), in.readOptionalNamed(Expression.class));
     }

+ 16 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/io/stream/PlanStreamInput.java

@@ -15,6 +15,7 @@ import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanNamedReader;
 import org.elasticsearch.xpack.esql.io.stream.PlanNameRegistry.PlanReader;
 import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
 import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
+import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
 import org.elasticsearch.xpack.ql.expression.Attribute;
 import org.elasticsearch.xpack.ql.expression.AttributeSet;
@@ -43,19 +44,28 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
     // hook for nameId, where can cache and map, for now just return a NameId of the same long value.
     private final LongFunction<NameId> nameIdFunction;
 
-    public PlanStreamInput(StreamInput streamInput, PlanNameRegistry registry, NamedWriteableRegistry namedWriteableRegistry) {
-        this(streamInput, registry, namedWriteableRegistry, DEFAULT_NAME_ID_FUNC);
+    private EsqlConfiguration configuration;
+
+    public PlanStreamInput(
+        StreamInput streamInput,
+        PlanNameRegistry registry,
+        NamedWriteableRegistry namedWriteableRegistry,
+        EsqlConfiguration configuration
+    ) {
+        this(streamInput, registry, namedWriteableRegistry, configuration, DEFAULT_NAME_ID_FUNC);
     }
 
     public PlanStreamInput(
         StreamInput streamInput,
         PlanNameRegistry registry,
         NamedWriteableRegistry namedWriteableRegistry,
+        EsqlConfiguration configuration,
         LongFunction<NameId> nameIdFunction
     ) {
         super(streamInput, namedWriteableRegistry);
         this.registry = registry;
         this.nameIdFunction = nameIdFunction;
+        this.configuration = configuration;
     }
 
     NameId nameIdFromLongValue(long value) {
@@ -146,6 +156,10 @@ public final class PlanStreamInput extends NamedWriteableAwareStreamInput {
         return new AttributeSet(builder);
     }
 
+    public EsqlConfiguration configuration() throws IOException {
+        return configuration;
+    }
+
     static void throwOnNullOptionalRead(Class<?> type) throws IOException {
         final IOException e = new IOException("read optional named returned null which is not allowed, type:" + type);
         assert false : e;

+ 9 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/show/ShowFunctions.java

@@ -14,6 +14,7 @@ import org.elasticsearch.xpack.ql.expression.ReferenceAttribute;
 import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
 import org.elasticsearch.xpack.ql.plan.logical.LeafPlan;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
+import org.elasticsearch.xpack.ql.session.Configuration;
 import org.elasticsearch.xpack.ql.tree.NodeInfo;
 import org.elasticsearch.xpack.ql.tree.Source;
 
@@ -53,12 +54,14 @@ public class ShowFunctions extends LeafPlan {
             if (constructors.length > 0) {
                 var params = constructors[0].getParameters(); // no multiple c'tors supported
                 for (int i = 1; i < params.length; i++) { // skipping 1st argument, the source
-                    if (i > 1) {
-                        sb.append(", ");
-                    }
-                    sb.append(params[i].getName());
-                    if (List.class.isAssignableFrom(params[i].getType())) {
-                        sb.append("...");
+                    if (Configuration.class.isAssignableFrom(params[i].getType()) == false) {
+                        if (i > 1) {
+                            sb.append(", ");
+                        }
+                        sb.append(params[i].getName());
+                        if (List.class.isAssignableFrom(params[i].getType())) {
+                            sb.append("...");
+                        }
                     }
                 }
             }

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequest.java

@@ -59,7 +59,7 @@ final class DataNodeRequest extends TransportRequest implements IndicesRequest {
         this.configuration = new EsqlConfiguration(in);
         this.shardIds = in.readList(ShardId::new);
         this.aliasFilters = in.readMap(Index::new, AliasFilter::readFrom);
-        this.plan = new PlanStreamInput(in, planNameRegistry, in.namedWriteableRegistry()).readPhysicalPlanNode();
+        this.plan = new PlanStreamInput(in, planNameRegistry, in.namedWriteableRegistry(), configuration).readPhysicalPlanNode();
     }
 
     @Override

+ 1 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java

@@ -55,7 +55,7 @@ public class SerializationTestUtils {
                 ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())),
                 writableRegistry()
             );
-            PlanStreamInput planStreamInput = new PlanStreamInput(in, planNameRegistry, writableRegistry());
+            PlanStreamInput planStreamInput = new PlanStreamInput(in, planNameRegistry, writableRegistry(), EsqlTestUtils.TEST_CFG);
             return deserializer.read(planStreamInput);
         } catch (IOException e) {
             throw new UncheckedIOException(e);

+ 3 - 2
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/io/stream/PlanNamedTypesTests.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.dissect.DissectParser;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.EqualsHashCodeTestUtils;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.SerializationTestUtils;
 import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
 import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg;
@@ -154,7 +155,7 @@ public class PlanNamedTypesTests extends ESTestCase {
         // read
         StreamInput in = ByteBufferStreamInput.wrap(BytesReference.toBytes(bso.bytes()));
         assertThat(in.readString(), equalTo("hello"));
-        var planStreamInput = new PlanStreamInput(in, planNameRegistry, SerializationTestUtils.writableRegistry());
+        var planStreamInput = new PlanStreamInput(in, planNameRegistry, SerializationTestUtils.writableRegistry(), EsqlTestUtils.TEST_CFG);
         var deser = (RowExec) planStreamInput.readPhysicalPlanNode();
         EqualsHashCodeTestUtils.checkEqualsAndHashCode(plan, unused -> deser);
         assertThat(in.readVInt(), equalTo(11_345));
@@ -558,6 +559,6 @@ public class PlanNamedTypesTests extends ESTestCase {
             ByteBufferStreamInput.wrap(BytesReference.toBytes(out.bytes())),
             SerializationTestUtils.writableRegistry()
         );
-        return new PlanStreamInput(in, planNameRegistry, SerializationTestUtils.writableRegistry());
+        return new PlanStreamInput(in, planNameRegistry, SerializationTestUtils.writableRegistry(), EsqlTestUtils.TEST_CFG);
     }
 }