Pārlūkot izejas kodu

Fork field-caps for ES|QL (#110738)

We need to fork the field-caps API for ES|QL to allow changes to the new
internal API without risking breaking the external field-caps API.
Nhat Nguyen 1 gadu atpakaļ
vecāks
revīzija
04845342f4

+ 9 - 4
docs/reference/esql/esql-across-clusters.asciidoc

@@ -8,6 +8,11 @@
 
 preview::["{ccs-cap} for {esql} is in technical preview and may be changed or removed in a future release. Elastic will work to fix any issues, but features in technical preview are not subject to the support SLA of official GA features."]
 
+[NOTE]
+====
+For {ccs-cap} with {esql} on version 8.16 or later, remote clusters must also be on version 8.16 or later.
+====
+
 With {esql}, you can execute a single query across multiple clusters.
 
 [discrete]
@@ -64,7 +69,7 @@ You will need to:
 * Create an API key on the *remote cluster* using the <<security-api-create-cross-cluster-api-key,Create cross-cluster API key>> API or using the {kibana-ref}/api-keys.html[Kibana API keys UI].
 * Add the API key to the keystore on the *local cluster*, as part of the steps in <<remote-clusters-security-api-key-local-actions,configuring the local cluster>>. All cross-cluster requests from the local cluster are bound by the API key’s privileges.
 
-Using {esql} with the API key based security model requires some additional permissions that may not be needed when using the traditional query DSL based search. 
+Using {esql} with the API key based security model requires some additional permissions that may not be needed when using the traditional query DSL based search.
 The following example API call creates a role that can query remote indices using {esql} when using the API key based security model.
 
 [source,console]
@@ -73,11 +78,11 @@ POST /_security/role/remote1
 {
   "cluster": ["cross_cluster_search"], <1>
   "indices": [
-    { 
+    {
       "names" : [""], <2>
       "privileges": ["read"]
     }
-  ], 
+  ],
   "remote_indices": [ <3>
     {
       "names": [ "logs-*" ],
@@ -93,7 +98,7 @@ POST /_security/role/remote1
 <3> The indices allowed read access to the remote cluster. The configured <<security-api-create-cross-cluster-api-key,cross-cluster API key>> must also allow this index to be read.
 <4> The `read_cross_cluster` privilege is always required when using {esql} across clusters with the API key based security model.
 <5> The remote clusters to which these privileges apply.
-This remote cluster must be configured with a <<security-api-create-cross-cluster-api-key,cross-cluster API key>> and connected to the remote cluster before the remote index can be queried. 
+This remote cluster must be configured with a <<security-api-create-cross-cluster-api-key,cross-cluster API key>> and connected to the remote cluster before the remote index can be queried.
 Verify connection using the <<cluster-remote-info, Remote cluster info>> API.
 
 You will then need a user or API key with the permissions you created above. The following example API call creates a user with the `remote1` role.

+ 17 - 3
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -111,11 +111,25 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
 
     @Override
     protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
+        executeRequest(task, request, REMOTE_TYPE, listener);
+    }
+
+    public void executeRequest(
+        Task task,
+        FieldCapabilitiesRequest request,
+        RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
+        ActionListener<FieldCapabilitiesResponse> listener
+    ) {
         // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
-        searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l)));
+        searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteAction, l)));
     }
 
-    private void doExecuteForked(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
+    private void doExecuteForked(
+        Task task,
+        FieldCapabilitiesRequest request,
+        RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
+        ActionListener<FieldCapabilitiesResponse> listener
+    ) {
         if (ccsCheckCompatibility) {
             checkCCSVersionCompatibility(request);
         }
@@ -249,7 +263,7 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
                     }
                 });
                 remoteClusterClient.execute(
-                    TransportFieldCapabilitiesAction.REMOTE_TYPE,
+                    remoteAction,
                     remoteRequest,
                     // The underlying transport service may call onFailure with a thread pool other than search_coordinator.
                     // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java

@@ -86,6 +86,7 @@ public final class IndexPrivilege extends Privilege {
         TransportClusterSearchShardsAction.TYPE.name(),
         TransportSearchShardsAction.TYPE.name(),
         TransportResolveClusterAction.NAME,
+        "indices:data/read/esql/resolve_fields",
         "indices:data/read/esql",
         "indices:data/read/esql/compute"
     );

+ 15 - 10
x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java

@@ -138,19 +138,24 @@ public class EsqlSecurityIT extends ESRestTestCase {
     }
 
     public void testInsufficientPrivilege() {
-        Exception error = expectThrows(
-            Exception.class,
-            () -> runESQLCommand("metadata1_read2", "FROM index-user1,index-user2 | STATS sum=sum(value)")
-        );
+        Exception error = expectThrows(Exception.class, () -> runESQLCommand("metadata1_read2", "FROM index-user1 | STATS sum=sum(value)"));
         logger.info("error", error);
+        assertThat(error.getMessage(), containsString("Unknown index [index-user1]"));
+    }
+
+    public void testLimitedPrivilege() throws Exception {
+        Response resp = runESQLCommand("metadata1_read2", """
+            FROM index-user1,index-user2 METADATA _index
+            | STATS sum=sum(value), index=VALUES(_index)
+            """);
+        assertOK(resp);
+        Map<String, Object> respMap = entityAsMap(resp);
         assertThat(
-            error.getMessage(),
-            containsString(
-                "unauthorized for user [test-admin] run as [metadata1_read2] "
-                    + "with effective roles [metadata1_read2] on indices [index-user1], "
-                    + "this action is granted by the index privileges [read,all]"
-            )
+            respMap.get("columns"),
+            equalTo(List.of(Map.of("name", "sum", "type", "double"), Map.of("name", "index", "type", "keyword")))
         );
+        assertThat(respMap.get("values"), equalTo(List.of(List.of(72.0, "index-user2"))));
+
     }
 
     public void testDocumentLevelSecurity() throws Exception {

+ 2 - 3
x-pack/plugin/esql/qa/server/multi-clusters/build.gradle

@@ -19,9 +19,8 @@ dependencies {
 }
 
 def supportedVersion = bwcVersion -> {
-  // This test is less restricted than the actual CCS compatibility matrix that we are supporting.
-  // CCQ is available on 8.13 or later
-  return bwcVersion.onOrAfter(Version.fromString("8.13.0"));
+  // ESQL requires its own resolve_fields API
+  return bwcVersion.onOrAfter(Version.fromString("8.16.0"));
 }
 
 BuildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName ->

+ 1 - 0
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

@@ -104,6 +104,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
     protected void shouldSkipTest(String testName) throws IOException {
         super.shouldSkipTest(testName);
         checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);
+        assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
         assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query));
         assumeTrue(
             "Test " + testName + " is skipped on " + Clusters.oldVersion(),

+ 16 - 0
x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClustersIT.java

@@ -28,6 +28,7 @@ import org.junit.rules.TestRule;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -61,6 +62,7 @@ public class MultiClustersIT extends ESRestTestCase {
 
     @Before
     public void setUpIndices() throws Exception {
+        assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
         final String mapping = """
              "properties": {
                "data": { "type": "long" },
@@ -201,4 +203,18 @@ public class MultiClustersIT extends ESRestTestCase {
         var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
         return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
     }
+
+    private TestFeatureService remoteFeaturesService() throws IOException {
+        if (remoteFeaturesService == null) {
+            try (RestClient remoteClient = remoteClusterClient()) {
+                var remoteNodeVersions = readVersionsFromNodesInfo(remoteClient);
+                var semanticNodeVersions = remoteNodeVersions.stream()
+                    .map(ESRestTestCase::parseLegacyVersion)
+                    .flatMap(Optional::stream)
+                    .collect(Collectors.toSet());
+                remoteFeaturesService = createTestFeatureService(getClusterStateFeatures(remoteClient), semanticNodeVersions);
+            }
+        }
+        return remoteFeaturesService;
+    }
 }

+ 52 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java

@@ -0,0 +1,52 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.RemoteClusterActionType;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
+import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.TransportService;
+
+/**
+ * A fork of the field-caps API for ES|QL. This fork allows us to gradually introduce features and optimizations to this internal
+ * API without risking breaking the external field-caps API. For now, this API delegates to the field-caps API, but gradually,
+ * we will decouple this API completely from the field-caps.
+ */
+public class EsqlResolveFieldsAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
+    public static final String NAME = "indices:data/read/esql/resolve_fields";
+    public static final ActionType<FieldCapabilitiesResponse> TYPE = new ActionType<>(NAME);
+    public static final RemoteClusterActionType<FieldCapabilitiesResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
+        NAME,
+        FieldCapabilitiesResponse::new
+    );
+
+    private final TransportFieldCapabilitiesAction fieldCapsAction;
+
+    @Inject
+    public EsqlResolveFieldsAction(
+        TransportService transportService,
+        ActionFilters actionFilters,
+        TransportFieldCapabilitiesAction fieldCapsAction
+    ) {
+        // TODO replace DIRECT_EXECUTOR_SERVICE when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
+        super(NAME, transportService, actionFilters, FieldCapabilitiesRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
+        this.fieldCapsAction = fieldCapsAction;
+    }
+
+    @Override
+    protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
+        fieldCapsAction.executeRequest(task, request, REMOTE_TYPE, listener);
+    }
+}

+ 7 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFeatures.java

@@ -174,6 +174,11 @@ public class EsqlFeatures implements FeatureSpecification {
      */
     public static final NodeFeature METRICS_SYNTAX = new NodeFeature("esql.metrics_syntax");
 
+    /**
+     * Internal resolve_fields API for ES|QL
+     */
+    public static final NodeFeature RESOLVE_FIELDS_API = new NodeFeature("esql.resolve_fields_api");
+
     private Set<NodeFeature> snapshotBuildFeatures() {
         assert Build.current().isSnapshot() : Build.current();
         return Set.of(METRICS_SYNTAX);
@@ -202,7 +207,8 @@ public class EsqlFeatures implements FeatureSpecification {
             STRING_LITERAL_AUTO_CASTING_EXTENDED,
             METADATA_FIELDS,
             TIMESPAN_ABBREVIATIONS,
-            COUNTER_TYPES
+            COUNTER_TYPES,
+            RESOLVE_FIELDS_API
         );
         if (Build.current().isSnapshot()) {
             return Collections.unmodifiableSet(Sets.union(features, snapshotBuildFeatures()));

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -51,6 +51,7 @@ import org.elasticsearch.xpack.esql.EsqlUsageTransportAction;
 import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder;
+import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction;
@@ -144,7 +145,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
             new ActionHandler<>(EsqlAsyncGetResultAction.INSTANCE, TransportEsqlAsyncGetResultsAction.class),
             new ActionHandler<>(EsqlStatsAction.INSTANCE, TransportEsqlStatsAction.class),
             new ActionHandler<>(XPackUsageFeatureAction.ESQL, EsqlUsageTransportAction.class),
-            new ActionHandler<>(XPackInfoFeatureAction.ESQL, EsqlInfoTransportAction.class)
+            new ActionHandler<>(XPackInfoFeatureAction.ESQL, EsqlInfoTransportAction.class),
+            new ActionHandler<>(EsqlResolveFieldsAction.TYPE, EsqlResolveFieldsAction.class)
         );
     }
 

+ 3 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

@@ -16,6 +16,7 @@ import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.index.mapper.TimeSeriesParams;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
 import org.elasticsearch.xpack.esql.core.index.EsIndex;
 import org.elasticsearch.xpack.esql.core.index.IndexResolution;
 import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -75,7 +76,8 @@ public class IndexResolver {
      * Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping.
      */
     public void resolveAsMergedMapping(String indexWildcard, Set<String> fieldNames, ActionListener<IndexResolution> listener) {
-        client.fieldCaps(
+        client.execute(
+            EsqlResolveFieldsAction.TYPE,
             createFieldCapsRequest(indexWildcard, fieldNames),
             listener.delegateFailureAndWrap((l, response) -> l.onResponse(mergedMappings(indexWildcard, response)))
         );

+ 6 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/stats/PlanExecutorMetricsTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
+import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
 import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
@@ -39,6 +40,7 @@ import java.util.function.BiConsumer;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -80,21 +82,21 @@ public class PlanExecutorMetricsTests extends ESTestCase {
         when(fieldCapabilitiesResponse.get()).thenReturn(fields(indices));
         doAnswer((Answer<Void>) invocation -> {
             @SuppressWarnings("unchecked")
-            ActionListener<FieldCapabilitiesResponse> listener = (ActionListener<FieldCapabilitiesResponse>) invocation.getArguments()[1];
+            ActionListener<FieldCapabilitiesResponse> listener = (ActionListener<FieldCapabilitiesResponse>) invocation.getArguments()[2];
             // simulate a valid field_caps response so we can parse and correctly analyze de query
             listener.onResponse(fieldCapabilitiesResponse);
             return null;
-        }).when(qlClient).fieldCaps(any(), any());
+        }).when(qlClient).execute(eq(EsqlResolveFieldsAction.TYPE), any(), any());
 
         Client esqlClient = mock(Client.class);
         IndexResolver indexResolver = new IndexResolver(esqlClient, EsqlDataTypeRegistry.INSTANCE);
         doAnswer((Answer<Void>) invocation -> {
             @SuppressWarnings("unchecked")
-            ActionListener<FieldCapabilitiesResponse> listener = (ActionListener<FieldCapabilitiesResponse>) invocation.getArguments()[1];
+            ActionListener<FieldCapabilitiesResponse> listener = (ActionListener<FieldCapabilitiesResponse>) invocation.getArguments()[2];
             // simulate a valid field_caps response so we can parse and correctly analyze de query
             listener.onResponse(new FieldCapabilitiesResponse(indexFieldCapabilities(indices), List.of()));
             return null;
-        }).when(esqlClient).fieldCaps(any(), any());
+        }).when(esqlClient).execute(eq(EsqlResolveFieldsAction.TYPE), any(), any());
 
         var planExecutor = new PlanExecutor(indexResolver);
         var enrichResolver = mockEnrichResolver();

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -552,6 +552,7 @@ public class Constants {
         "indices:data/read/eql/async/get",
         "indices:data/read/esql",
         "indices:data/read/esql/async/get",
+        "indices:data/read/esql/resolve_fields",
         "indices:data/read/explain",
         "indices:data/read/field_caps",
         "indices:data/read/get",