Browse Source

Fix now in millis for ESQL search contexts (#103474)

Currently, ESQL search contexts do not have the correct actual 'now' in 
milliseconds. Ideally, we should use a consistent 'now' in millis for
all nodes participating in the execution of a search request. I will
follow up with a PR to deserialize that value between nodes for version
8.13 and later.

Closes #103455
Nhat Nguyen 1 year ago
parent
commit
90ae2151fd

+ 6 - 0
docs/changelog/103474.yaml

@@ -0,0 +1,6 @@
+pr: 103474
+summary: Fix now in millis for ESQL search contexts
+area: ES|QL
+type: bug
+issues:
+ - 103455

+ 58 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeBasedIndicesIT.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.query.RangeQueryBuilder;
+
+import java.util.List;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
+import static org.hamcrest.Matchers.hasSize;
+
+public class TimeBasedIndicesIT extends AbstractEsqlIntegTestCase {
+
+    public void testFilter() {
+        long epoch = System.currentTimeMillis();
+        assertAcked(client().admin().indices().prepareCreate("test").setMapping("@timestamp", "type=date", "value", "type=long"));
+        BulkRequestBuilder bulk = client().prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        int oldDocs = between(10, 100);
+        for (int i = 0; i < oldDocs; i++) {
+            long timestamp = epoch - TimeValue.timeValueHours(between(1, 2)).millis();
+            bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", -i));
+        }
+        int newDocs = between(10, 100);
+        for (int i = 0; i < newDocs; i++) {
+            long timestamp = epoch + TimeValue.timeValueHours(between(1, 2)).millis();
+            bulk.add(new IndexRequest().source("@timestamp", timestamp, "value", i));
+        }
+        bulk.get();
+        {
+            EsqlQueryRequest request = new EsqlQueryRequest();
+            request.query("FROM test | limit 1000");
+            request.filter(new RangeQueryBuilder("@timestamp").from(epoch - TimeValue.timeValueHours(3).millis()).to("now"));
+            try (var resp = run(request)) {
+                List<List<Object>> values = getValuesList(resp);
+                assertThat(values, hasSize(oldDocs));
+            }
+        }
+        {
+            EsqlQueryRequest request = new EsqlQueryRequest();
+            request.query("FROM test | limit 1000");
+            request.filter(new RangeQueryBuilder("@timestamp").from("now").to(epoch + TimeValue.timeValueHours(3).millis()));
+            try (var resp = run(request)) {
+                List<List<Object>> values = getValuesList(resp);
+                assertThat(values, hasSize(newDocs));
+            }
+        }
+    }
+}

+ 9 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -328,6 +328,7 @@ public class ComputeService {
 
     private void acquireSearchContexts(
         List<ShardId> shardIds,
+        EsqlConfiguration configuration,
         Map<Index, AliasFilter> aliasFilters,
         ActionListener<List<SearchContext>> listener
     ) {
@@ -351,11 +352,12 @@ public class ComputeService {
                             try {
                                 for (IndexShard shard : targetShards) {
                                     var aliasFilter = aliasFilters.getOrDefault(shard.shardId().getIndex(), AliasFilter.EMPTY);
-                                    ShardSearchRequest shardSearchLocalRequest = new ShardSearchRequest(shard.shardId(), 0, aliasFilter);
-                                    SearchContext context = searchService.createSearchContext(
-                                        shardSearchLocalRequest,
-                                        SearchService.NO_TIMEOUT
+                                    var shardRequest = new ShardSearchRequest(
+                                        shard.shardId(),
+                                        configuration.absoluteStartedTimeInMillis(),
+                                        aliasFilter
                                     );
+                                    SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT);
                                     searchContexts.add(context);
                                 }
                                 for (SearchContext searchContext : searchContexts) {
@@ -501,8 +503,9 @@ public class ComputeService {
             final var exchangeSink = exchangeService.getSinkHandler(sessionId);
             parentTask.addListener(() -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException("task cancelled")));
             final ActionListener<DataNodeResponse> listener = new OwningChannelActionListener<>(channel);
-            acquireSearchContexts(request.shardIds(), request.aliasFilters(), ActionListener.wrap(searchContexts -> {
-                var computeContext = new ComputeContext(sessionId, searchContexts, request.configuration(), null, exchangeSink);
+            final EsqlConfiguration configuration = request.configuration();
+            acquireSearchContexts(request.shardIds(), configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
+                var computeContext = new ComputeContext(sessionId, searchContexts, configuration, null, exchangeSink);
                 runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> {
                     // don't return until all pages are fetched
                     exchangeSink.addCompletionListener(

+ 9 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlConfiguration.java

@@ -112,6 +112,15 @@ public class EsqlConfiguration extends Configuration implements Writeable {
         return query;
     }
 
+    /**
+     * Returns the current time in milliseconds from the time epoch for the execution of this request.
+     * It ensures consistency by using the same value on all nodes involved in the search request.
+     * Note: Currently, it returns {@link System#currentTimeMillis()}, but this value will be serialized between nodes.
+     */
+    public long absoluteStartedTimeInMillis() {
+        return System.currentTimeMillis();
+    }
+
     /**
      * Enable profiling, sacrificing performance to return information about
      * what operations are taking the most time.