Browse Source

Took time and cluster details get updated for coordinator only query operations (#114075) (#114153)

* Took time and cluster details get updated for coordinator only query operations

The ComputeService.runCompute pathway for coordinator only operations (such as 
`FROM foo | LIMIT 0` or a ROW command) get updated with overall took time.
This also includes support for cross-cluster coordinator only operations, which come
about with queries like `FROM foo,remote:foo | LIMIT 0`. The _clusters metadata is
now properly updated for those cases as well.

Fixes https://github.com/elastic/elasticsearch/issues/114014
Michael Peterson 1 year ago
parent
commit
6edc606eb9

+ 4 - 2
x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

@@ -117,7 +117,8 @@ public class RestEsqlIT extends RestEsqlTestCase {
             setLoggingLevel("INFO");
             RequestObjectBuilder builder = requestObjectBuilder().query("ROW DO_NOT_LOG_ME = 1");
             Map<String, Object> result = runEsql(builder);
-            assertEquals(2, result.size());
+            assertEquals(3, result.size());
+            assertThat(((Integer) result.get("took")).intValue(), greaterThanOrEqualTo(0));
             Map<String, String> colA = Map.of("name", "DO_NOT_LOG_ME", "type", "integer");
             assertEquals(List.of(colA), result.get("columns"));
             assertEquals(List.of(List.of(1)), result.get("values"));
@@ -136,7 +137,8 @@ public class RestEsqlIT extends RestEsqlTestCase {
             setLoggingLevel("DEBUG");
             RequestObjectBuilder builder = requestObjectBuilder().query("ROW DO_LOG_ME = 1");
             Map<String, Object> result = runEsql(builder);
-            assertEquals(2, result.size());
+            assertEquals(3, result.size());
+            assertThat(((Integer) result.get("took")).intValue(), greaterThanOrEqualTo(0));
             Map<String, String> colA = Map.of("name", "DO_LOG_ME", "type", "integer");
             assertEquals(List.of(colA), result.get("columns"));
             assertEquals(List.of(List.of(1)), result.get("values"));

+ 2 - 1
x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

@@ -249,7 +249,8 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
 
     public void testGetAnswer() throws IOException {
         Map<String, Object> answer = runEsql(requestObjectBuilder().query("row a = 1, b = 2"));
-        assertEquals(2, answer.size());
+        assertEquals(3, answer.size());
+        assertThat(((Integer) answer.get("took")).intValue(), greaterThanOrEqualTo(0));
         Map<String, String> colA = Map.of("name", "a", "type", "integer");
         Map<String, String> colB = Map.of("name", "b", "type", "integer");
         assertEquals(List.of(colA, colB), answer.get("columns"));

+ 103 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

@@ -43,6 +43,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
     private static final String REMOTE_CLUSTER = "cluster-a";
@@ -339,6 +340,108 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         }
     }
 
+    /**
+     * Searches with LIMIT 0 are used by Kibana to get a list of columns. After the initial planning
+     * (which involves cross-cluster field-caps calls), it is a coordinator only operation at query time
+     * which uses a different pathway compared to queries that require data node (and remote data node) operations
+     * at query time.
+     */
+    public void testCCSExecutionOnSearchesWithLimit0() {
+        setupTwoClusters();
+
+        // Ensure non-cross cluster queries have overall took time
+        try (EsqlQueryResponse resp = runQuery("FROM logs* | LIMIT 0")) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.isCrossClusterSearch(), is(false));
+            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+        }
+
+        // ensure cross-cluster searches have overall took time and correct per-cluster details in EsqlExecutionInfo
+        try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:* | LIMIT 0")) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.isCrossClusterSearch(), is(true));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
+
+            EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
+            assertThat(remoteCluster.getIndexExpression(), equalTo("*"));
+            assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+            assertNull(remoteCluster.getTotalShards());
+            assertNull(remoteCluster.getSuccessfulShards());
+            assertNull(remoteCluster.getSkippedShards());
+            assertNull(remoteCluster.getFailedShards());
+
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
+            assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
+            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+            assertNull(localCluster.getTotalShards());
+            assertNull(localCluster.getSuccessfulShards());
+            assertNull(localCluster.getSkippedShards());
+            assertNull(localCluster.getFailedShards());
+        }
+
+        try (EsqlQueryResponse resp = runQuery("FROM logs*,cluster-a:nomatch* | LIMIT 0")) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.isCrossClusterSearch(), is(true));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
+
+            EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
+            assertThat(remoteCluster.getIndexExpression(), equalTo("nomatch*"));
+            assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
+            assertThat(remoteCluster.getTook().millis(), equalTo(0L));
+            assertThat(remoteCluster.getTotalShards(), equalTo(0));
+            assertThat(remoteCluster.getSuccessfulShards(), equalTo(0));
+            assertThat(remoteCluster.getSkippedShards(), equalTo(0));
+            assertThat(remoteCluster.getFailedShards(), equalTo(0));
+
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
+            assertThat(localCluster.getIndexExpression(), equalTo("logs*"));
+            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+            assertNull(localCluster.getTotalShards());
+            assertNull(localCluster.getSuccessfulShards());
+            assertNull(localCluster.getSkippedShards());
+            assertNull(localCluster.getFailedShards());
+        }
+
+        try (EsqlQueryResponse resp = runQuery("FROM nomatch*,cluster-a:* | LIMIT 0")) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.isCrossClusterSearch(), is(true));
+            long overallTookMillis = executionInfo.overallTook().millis();
+            assertThat(overallTookMillis, greaterThanOrEqualTo(0L));
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(REMOTE_CLUSTER, LOCAL_CLUSTER)));
+
+            EsqlExecutionInfo.Cluster remoteCluster = executionInfo.getCluster(REMOTE_CLUSTER);
+            assertThat(remoteCluster.getIndexExpression(), equalTo("*"));
+            assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(remoteCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(remoteCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+            assertNull(remoteCluster.getTotalShards());
+            assertNull(remoteCluster.getSuccessfulShards());
+            assertNull(remoteCluster.getSkippedShards());
+            assertNull(remoteCluster.getFailedShards());
+
+            EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER);
+            assertThat(localCluster.getIndexExpression(), equalTo("nomatch*"));
+            // TODO: in https://github.com/elastic/elasticsearch/issues/112886, this will be changed to be SKIPPED
+            assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+            assertThat(localCluster.getTook().millis(), greaterThanOrEqualTo(0L));
+            assertThat(localCluster.getTook().millis(), lessThanOrEqualTo(overallTookMillis));
+        }
+    }
+
     public void testMetadataIndex() {
         Map<String, Object> testClusterInfo = setupTwoClusters();
         int localNumShards = (Integer) testClusterInfo.get("local.num_shards");

+ 28 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -171,17 +171,21 @@ public class ComputeService {
                 null,
                 null
             );
+            String local = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
             try (
                 var computeListener = ComputeListener.create(
-                    RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
+                    local,
                     transportService,
                     rootTask,
                     execInfo,
                     configuration.getQueryStartTimeNanos(),
-                    listener.map(r -> new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo))
+                    listener.map(r -> {
+                        updateExecutionInfoAfterCoordinatorOnlyQuery(configuration.getQueryStartTimeNanos(), execInfo);
+                        return new Result(physicalPlan.output(), collectedPages, r.getProfiles(), execInfo);
+                    })
                 )
             ) {
-                runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute());
+                runCompute(rootTask, computeContext, coordinatorPlan, computeListener.acquireCompute(local));
                 return;
             }
         } else {
@@ -247,6 +251,27 @@ public class ComputeService {
         }
     }
 
+    private static void updateExecutionInfoAfterCoordinatorOnlyQuery(long queryStartNanos, EsqlExecutionInfo execInfo) {
+        long tookTimeNanos = System.nanoTime() - queryStartNanos;
+        execInfo.overallTook(new TimeValue(tookTimeNanos, TimeUnit.NANOSECONDS));
+        if (execInfo.isCrossClusterSearch()) {
+            for (String clusterAlias : execInfo.clusterAliases()) {
+                // The local cluster 'took' time gets updated as part of the acquireCompute(local) call in the coordinator, so
+                // here we only need to update status for remote clusters since there are no remote ComputeListeners in this case.
+                // This happens in cross cluster searches that use LIMIT 0, e.g, FROM logs*,remote*:logs* | LIMIT 0.
+                if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
+                    execInfo.swapCluster(clusterAlias, (k, v) -> {
+                        if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
+                            return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL).build();
+                        } else {
+                            return v;
+                        }
+                    });
+                }
+            }
+        }
+    }
+
     private List<RemoteCluster> getRemoteClusters(
         Map<String, OriginalIndices> clusterToConcreteIndices,
         Map<String, OriginalIndices> clusterToOriginalIndices

+ 25 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -72,6 +72,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Predicate;
@@ -245,6 +246,7 @@ public class EsqlSession {
                 if (indexResolution.isValid()) {
                     updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
                     updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.getUnavailableClusters());
+                    updateTookTimeForRemoteClusters(executionInfo);
                     Set<String> newClusters = enrichPolicyResolver.groupIndicesPerCluster(
                         indexResolution.get().concreteIndices().toArray(String[]::new)
                     ).keySet();
@@ -285,6 +287,7 @@ public class EsqlSession {
         }
         Set<String> clustersRequested = executionInfo.clusterAliases();
         Set<String> clustersWithNoMatchingIndices = Sets.difference(clustersRequested, clustersWithResolvedIndices);
+        clustersWithNoMatchingIndices.removeAll(indexResolution.getUnavailableClusters());
         /*
          * These are clusters in the original request that are not present in the field-caps response. They were
          * specified with an index or indices that do not exist, so the search on that cluster is done.
@@ -304,6 +307,28 @@ public class EsqlSession {
         }
     }
 
+    private void updateTookTimeForRemoteClusters(EsqlExecutionInfo executionInfo) {
+        if (executionInfo.isCrossClusterSearch()) {
+            for (String clusterAlias : executionInfo.clusterAliases()) {
+                if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
+                    executionInfo.swapCluster(clusterAlias, (k, v) -> {
+                        if (v.getTook() == null && v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) {
+                            // set took time in case we are finished with the remote cluster (e.g., FROM foo | LIMIT 0).
+                            // this will be overwritten later if ES|QL operations happen on the remote cluster (the typical scenario)
+                            TimeValue took = new TimeValue(
+                                System.nanoTime() - configuration.getQueryStartTimeNanos(),
+                                TimeUnit.NANOSECONDS
+                            );
+                            return new EsqlExecutionInfo.Cluster.Builder(v).setTook(took).build();
+                        } else {
+                            return v;
+                        }
+                    });
+                }
+            }
+        }
+    }
+
     private void preAnalyzeIndices(
         LogicalPlan parsed,
         EsqlExecutionInfo executionInfo,

+ 5 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionTests.java

@@ -216,6 +216,7 @@ public class EsqlSessionTests extends ESTestCase {
                 randomMapping(),
                 Map.of("logs-a", IndexMode.STANDARD)
             );
+            // mark remote1 as unavailable
             IndexResolution indexResolution = IndexResolution.valid(esIndex, Set.of(remote1Alias));
 
             EsqlSession.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);
@@ -226,12 +227,10 @@ public class EsqlSessionTests extends ESTestCase {
 
             EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias);
             assertThat(remote1Cluster.getIndexExpression(), equalTo("*"));
-            assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
-            assertThat(remote1Cluster.getTook().millis(), equalTo(0L));
-            assertThat(remote1Cluster.getTotalShards(), equalTo(0));
-            assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0));
-            assertThat(remote1Cluster.getSkippedShards(), equalTo(0));
-            assertThat(remote1Cluster.getFailedShards(), equalTo(0));
+            // remote1 is left as RUNNING, since another method (updateExecutionInfoWithUnavailableClusters) not under test changes status
+            assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.RUNNING));
+            assertNull(remote1Cluster.getTook());
+            assertNull(remote1Cluster.getTotalShards());
 
             EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias);
             assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*"));