Browse Source

ESQL: Enterprise license enforcement for CCS (#118102) (#118620)

ES|QL CCS is an enterprise licensed feature. This PR enforces that no ES|QL CCS query
can proceed unless a valid enterprise or trial license is present on the querying cluster.

If a valid license is not present a 400 Bad Request error is returned explaining that an enterprise
license is needed and showing what license (if any) was found.

If a valid license is found, then the license usage timestamp will be updated. Subsequent
calls to the `GET /_license/feature_usage` endpoint will show an entry for `esql-ccs` with
the last timestamp that it was checked and used.

```
{
  "features": [
    {
      "family": null,
      "name": "esql-ccs",
      "context": null,
      "last_used": "2024-12-09T19:54:38.767Z",
      "license_level": "enterprise"
    }
  ]
}
```
Michael Peterson 10 months ago
parent
commit
63e4262223
21 changed files with 935 additions and 472 deletions
  1. 5 0
      docs/changelog/118102.yaml
  2. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensedFeature.java
  3. 21 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java
  4. 17 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java
  5. 290 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java
  6. 1 2
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java
  7. 6 159
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java
  8. 2 3
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java
  9. 1 2
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java
  10. 2 244
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java
  11. 203 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueriesWithInvalidLicenseIT.java
  12. 8 9
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java
  13. 26 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithEnterpriseOrTrialLicense.java
  14. 47 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithNonEnterpriseOrExpiredLicense.java
  15. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java
  16. 51 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlLicenseChecker.java
  17. 3 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java
  18. 43 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java
  19. 158 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java
  20. 35 0
      x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java
  21. 11 52
      x-pack/qa/multi-cluster-search-security/legacy-with-basic-license/src/test/resources/rest-api-spec/test/querying_cluster/80_esql.yml

+ 5 - 0
docs/changelog/118102.yaml

@@ -0,0 +1,5 @@
+pr: 118102
+summary: "ESQL: Enterprise license enforcement for CCS"
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensedFeature.java

@@ -104,7 +104,7 @@ public abstract class LicensedFeature {
         return needsActive;
     }
 
-    /** Create a momentary feature for hte given license level */
+    /** Create a momentary feature for the given license level */
     public static Momentary momentary(String family, String name, License.OperationMode licenseLevel) {
         return new Momentary(family, name, licenseLevel, true);
     }

+ 21 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java

@@ -106,6 +106,7 @@ public class XPackLicenseState {
         messages.put(XPackField.CCR, XPackLicenseState::ccrAcknowledgementMessages);
         messages.put(XPackField.ENTERPRISE_SEARCH, XPackLicenseState::enterpriseSearchAcknowledgementMessages);
         messages.put(XPackField.REDACT_PROCESSOR, XPackLicenseState::redactProcessorAcknowledgementMessages);
+        messages.put(XPackField.ESQL, XPackLicenseState::esqlAcknowledgementMessages);
         ACKNOWLEDGMENT_MESSAGES = Collections.unmodifiableMap(messages);
     }
 
@@ -243,6 +244,26 @@ public class XPackLicenseState {
         return Strings.EMPTY_ARRAY;
     }
 
+    private static String[] esqlAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
+        /*
+         * Provide an acknowledgement warning to customers that downgrade from Trial or Enterprise to a lower
+         * license level (Basic, Standard, Gold or Premium) that they will no longer be able to do CCS in ES|QL.
+         */
+        switch (newMode) {
+            case BASIC:
+            case STANDARD:
+            case GOLD:
+            case PLATINUM:
+                switch (currentMode) {
+                    case TRIAL:
+                    case ENTERPRISE:
+                        return new String[] { "ES|QL cross-cluster search will be disabled." };
+                }
+                break;
+        }
+        return Strings.EMPTY_ARRAY;
+    }
+
     private static String[] machineLearningAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
         switch (newMode) {
             case BASIC:

+ 17 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.XPackField;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -59,6 +60,12 @@ public class XPackLicenseStateTests extends ESTestCase {
         assertEquals(expectedMessages, gotMessages.length);
     }
 
+    void assertAckMessages(String feature, OperationMode from, OperationMode to, Set<String> expectedMessages) {
+        String[] gotMessages = XPackLicenseState.ACKNOWLEDGMENT_MESSAGES.get(feature).apply(from, to);
+        Set<String> actualMessages = Arrays.stream(gotMessages).collect(Collectors.toSet());
+        assertThat(actualMessages, equalTo(expectedMessages));
+    }
+
     static <T> T randomFrom(T[] values, Predicate<T> filter) {
         return randomFrom(Arrays.stream(values).filter(filter).collect(Collectors.toList()));
     }
@@ -143,6 +150,16 @@ public class XPackLicenseStateTests extends ESTestCase {
         assertAckMessages(XPackField.CCR, randomTrialOrPlatinumMode(), randomBasicStandardOrGold(), 1);
     }
 
+    public void testEsqlAckToTrialOrPlatinum() {
+        assertAckMessages(XPackField.ESQL, randomMode(), randomFrom(TRIAL, ENTERPRISE), 0);
+    }
+
+    public void testEsqlAckTrialOrEnterpriseToNotTrialOrEnterprise() {
+        for (OperationMode to : List.of(BASIC, STANDARD, GOLD, PLATINUM)) {
+            assertAckMessages(XPackField.ESQL, randomFrom(TRIAL, ENTERPRISE), to, Set.of("ES|QL cross-cluster search will be disabled."));
+        }
+    }
+
     public void testExpiredLicense() {
         // use standard feature which would normally be allowed at all license levels
         LicensedFeature feature = LicensedFeature.momentary("family", "enterpriseFeature", STANDARD);

+ 290 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEnrichBasedCrossClusterTestCase.java

@@ -0,0 +1,290 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.TransportAction;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.ingest.common.IngestCommonPlugin;
+import org.elasticsearch.injection.guice.Inject;
+import org.elasticsearch.license.LicenseService;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.protocol.xpack.XPackInfoRequest;
+import org.elasticsearch.protocol.xpack.XPackInfoResponse;
+import org.elasticsearch.reindex.ReindexPlugin;
+import org.elasticsearch.test.AbstractMultiClustersTestCase;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
+import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
+import org.elasticsearch.xpack.core.action.XPackInfoFeatureResponse;
+import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
+import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
+import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
+import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
+import org.elasticsearch.xpack.enrich.EnrichPlugin;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
+import org.elasticsearch.xpack.esql.plan.logical.Enrich;
+import org.junit.After;
+import org.junit.Before;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.containsString;
+
+public abstract class AbstractEnrichBasedCrossClusterTestCase extends AbstractMultiClustersTestCase {
+
+    public static String REMOTE_CLUSTER_1 = "c1";
+    public static String REMOTE_CLUSTER_2 = "c2";
+
+    /**
+     * subclasses should override if they don't want enrich policies wiped after each test method run
+     */
+    protected boolean tolerateErrorsWhenWipingEnrichPolicies() {
+        return false;
+    }
+
+    @Override
+    protected List<String> remoteClusterAlias() {
+        return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
+    }
+
+    protected Collection<String> allClusters() {
+        return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER);
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
+        plugins.add(CrossClustersEnrichIT.LocalStateEnrich.class);
+        plugins.add(IngestCommonPlugin.class);
+        plugins.add(ReindexPlugin.class);
+        return plugins;
+    }
+
+    @Override
+    protected Settings nodeSettings() {
+        return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
+    }
+
+    static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os"));
+    static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor"));
+
+    @Before
+    public void setupHostsEnrich() {
+        // the hosts policy are identical on every node
+        Map<String, String> allHosts = Map.of(
+            "192.168.1.2",
+            "Windows",
+            "192.168.1.3",
+            "MacOS",
+            "192.168.1.4",
+            "Linux",
+            "192.168.1.5",
+            "Android",
+            "192.168.1.6",
+            "iOS",
+            "192.168.1.7",
+            "Windows",
+            "192.168.1.8",
+            "MacOS",
+            "192.168.1.9",
+            "Linux",
+            "192.168.1.10",
+            "Linux",
+            "192.168.1.11",
+            "Windows"
+        );
+        for (String cluster : allClusters()) {
+            Client client = client(cluster);
+            client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get();
+            for (Map.Entry<String, String> h : allHosts.entrySet()) {
+                client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get();
+            }
+            client.admin().indices().prepareRefresh("hosts").get();
+            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy))
+                .actionGet();
+            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts"))
+                .actionGet();
+            assertAcked(client.admin().indices().prepareDelete("hosts"));
+        }
+    }
+
+    @Before
+    public void setupVendorPolicy() {
+        var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat");
+        var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse");
+        var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu");
+        var vendors = Map.of(LOCAL_CLUSTER, localVendors, REMOTE_CLUSTER_1, c1Vendors, REMOTE_CLUSTER_2, c2Vendors);
+        for (Map.Entry<String, Map<String, String>> e : vendors.entrySet()) {
+            Client client = client(e.getKey());
+            client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get();
+            for (Map.Entry<String, String> v : e.getValue().entrySet()) {
+                client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get();
+            }
+            client.admin().indices().prepareRefresh("vendors").get();
+            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy))
+                .actionGet();
+            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors"))
+                .actionGet();
+            assertAcked(client.admin().indices().prepareDelete("vendors"));
+        }
+    }
+
+    @Before
+    public void setupEventsIndices() {
+        record Event(long timestamp, String user, String host) {
+
+        }
+        List<Event> e0 = List.of(
+            new Event(1, "matthew", "192.168.1.3"),
+            new Event(2, "simon", "192.168.1.5"),
+            new Event(3, "park", "192.168.1.2"),
+            new Event(4, "andrew", "192.168.1.7"),
+            new Event(5, "simon", "192.168.1.20"),
+            new Event(6, "kevin", "192.168.1.2"),
+            new Event(7, "akio", "192.168.1.5"),
+            new Event(8, "luke", "192.168.1.2"),
+            new Event(9, "jack", "192.168.1.4")
+        );
+        List<Event> e1 = List.of(
+            new Event(1, "andres", "192.168.1.2"),
+            new Event(2, "sergio", "192.168.1.6"),
+            new Event(3, "kylian", "192.168.1.8"),
+            new Event(4, "andrew", "192.168.1.9"),
+            new Event(5, "jack", "192.168.1.3"),
+            new Event(6, "kevin", "192.168.1.4"),
+            new Event(7, "akio", "192.168.1.7"),
+            new Event(8, "kevin", "192.168.1.21"),
+            new Event(9, "andres", "192.168.1.8")
+        );
+        List<Event> e2 = List.of(
+            new Event(1, "park", "192.168.1.25"),
+            new Event(2, "akio", "192.168.1.5"),
+            new Event(3, "park", "192.168.1.2"),
+            new Event(4, "kevin", "192.168.1.3")
+        );
+        for (var c : Map.of(LOCAL_CLUSTER, e0, REMOTE_CLUSTER_1, e1, REMOTE_CLUSTER_2, e2).entrySet()) {
+            Client client = client(c.getKey());
+            client.admin()
+                .indices()
+                .prepareCreate("events")
+                .setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip")
+                .get();
+            for (var e : c.getValue()) {
+                client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get();
+            }
+            client.admin().indices().prepareRefresh("events").get();
+        }
+    }
+
+    @After
+    public void wipeEnrichPolicies() {
+        for (String cluster : allClusters()) {
+            cluster(cluster).wipe(Set.of());
+            for (String policy : List.of("hosts", "vendors")) {
+                if (tolerateErrorsWhenWipingEnrichPolicies()) {
+                    try {
+                        client(cluster).execute(
+                            DeleteEnrichPolicyAction.INSTANCE,
+                            new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
+                        );
+                    } catch (Exception e) {
+                        assertThat(e.getMessage(), containsString("Cluster is already closed"));
+                    }
+
+                } else {
+                    client(cluster).execute(
+                        DeleteEnrichPolicyAction.INSTANCE,
+                        new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
+                    );
+                }
+            }
+        }
+    }
+
+    static String enrichHosts(Enrich.Mode mode) {
+        return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields());
+    }
+
+    static String enrichVendors(Enrich.Mode mode) {
+        return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields());
+    }
+
+    protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
+        EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
+        request.query(query);
+        request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
+        if (randomBoolean()) {
+            request.profile(true);
+        }
+        if (ccsMetadataInResponse != null) {
+            request.includeCCSMetadata(ccsMetadataInResponse);
+        }
+        return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
+    }
+
+    public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
+        return switch (randomIntBetween(1, 3)) {
+            case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE);
+            case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE);
+            case 3 -> new Tuple<>(null, Boolean.FALSE);
+            default -> throw new AssertionError("should not get here");
+        };
+    }
+
+    public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
+        public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {
+            super(settings, configPath);
+
+            plugins.add(new EnrichPlugin(settings) {
+                @Override
+                protected XPackLicenseState getLicenseState() {
+                    return this.getLicenseState();
+                }
+            });
+        }
+
+        public static class EnrichTransportXPackInfoAction extends TransportXPackInfoAction {
+            @Inject
+            public EnrichTransportXPackInfoAction(
+                TransportService transportService,
+                ActionFilters actionFilters,
+                LicenseService licenseService,
+                NodeClient client
+            ) {
+                super(transportService, actionFilters, licenseService, client);
+            }
+
+            @Override
+            protected List<ActionType<XPackInfoFeatureResponse>> infoActions() {
+                return Collections.singletonList(XPackInfoFeatureAction.ENRICH);
+            }
+        }
+
+        @Override
+        protected Class<? extends TransportAction<XPackInfoRequest, XPackInfoResponse>> getInfoAction() {
+            return CrossClustersQueriesWithInvalidLicenseIT.LocalStateEnrich.EnrichTransportXPackInfoAction.class;
+        }
+    }
+}

+ 1 - 2
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java

@@ -35,7 +35,6 @@ import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -78,7 +77,7 @@ public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase {
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
-        plugins.add(EsqlPlugin.class);
+        plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
         plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
         plugins.add(InternalExchangePlugin.class);
         plugins.add(PauseFieldPlugin.class);

+ 6 - 159
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterEnrichUnavailableClustersIT.java

@@ -8,36 +8,21 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.core.Tuple;
-import org.elasticsearch.ingest.common.IngestCommonPlugin;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.reindex.ReindexPlugin;
-import org.elasticsearch.test.AbstractMultiClustersTestCase;
 import org.elasticsearch.transport.RemoteClusterAware;
-import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
-import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
 import org.elasticsearch.xpack.esql.core.type.DataType;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
-import org.junit.Before;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
-import static org.elasticsearch.xpack.esql.action.CrossClustersEnrichIT.enrichHosts;
-import static org.elasticsearch.xpack.esql.action.CrossClustersEnrichIT.enrichVendors;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -47,151 +32,26 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
  * This IT test is the dual of CrossClustersEnrichIT, which tests "happy path"
  * and this one tests unavailable cluster scenarios using (most of) the same tests.
  */
-public class CrossClusterEnrichUnavailableClustersIT extends AbstractMultiClustersTestCase {
-
-    public static String REMOTE_CLUSTER_1 = "c1";
-    public static String REMOTE_CLUSTER_2 = "c2";
-
-    @Override
-    protected Collection<String> remoteClusterAlias() {
-        return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
-    }
+public class CrossClusterEnrichUnavailableClustersIT extends AbstractEnrichBasedCrossClusterTestCase {
 
     @Override
     protected boolean reuseClusters() {
         return false;
     }
 
-    private Collection<String> allClusters() {
-        return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER);
+    @Override
+    protected boolean tolerateErrorsWhenWipingEnrichPolicies() {
+        // attempt to wipe will fail since some clusters are already closed
+        return true;
     }
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
-        plugins.add(EsqlPlugin.class);
-        plugins.add(CrossClustersEnrichIT.LocalStateEnrich.class);
-        plugins.add(IngestCommonPlugin.class);
-        plugins.add(ReindexPlugin.class);
+        plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
         return plugins;
     }
 
-    @Override
-    protected Settings nodeSettings() {
-        return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
-    }
-
-    @Before
-    public void setupHostsEnrich() {
-        // the hosts policy are identical on every node
-        Map<String, String> allHosts = Map.of(
-            "192.168.1.2",
-            "Windows",
-            "192.168.1.3",
-            "MacOS",
-            "192.168.1.4",
-            "Linux",
-            "192.168.1.5",
-            "Android",
-            "192.168.1.6",
-            "iOS",
-            "192.168.1.7",
-            "Windows",
-            "192.168.1.8",
-            "MacOS",
-            "192.168.1.9",
-            "Linux",
-            "192.168.1.10",
-            "Linux",
-            "192.168.1.11",
-            "Windows"
-        );
-        for (String cluster : allClusters()) {
-            Client client = client(cluster);
-            client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get();
-            for (Map.Entry<String, String> h : allHosts.entrySet()) {
-                client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get();
-            }
-            client.admin().indices().prepareRefresh("hosts").get();
-            client.execute(
-                PutEnrichPolicyAction.INSTANCE,
-                new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", CrossClustersEnrichIT.hostPolicy)
-            ).actionGet();
-            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts"))
-                .actionGet();
-            assertAcked(client.admin().indices().prepareDelete("hosts"));
-        }
-    }
-
-    @Before
-    public void setupVendorPolicy() {
-        var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat");
-        var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse");
-        var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu");
-        var vendors = Map.of(LOCAL_CLUSTER, localVendors, "c1", c1Vendors, "c2", c2Vendors);
-        for (Map.Entry<String, Map<String, String>> e : vendors.entrySet()) {
-            Client client = client(e.getKey());
-            client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get();
-            for (Map.Entry<String, String> v : e.getValue().entrySet()) {
-                client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get();
-            }
-            client.admin().indices().prepareRefresh("vendors").get();
-            client.execute(
-                PutEnrichPolicyAction.INSTANCE,
-                new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", CrossClustersEnrichIT.vendorPolicy)
-            ).actionGet();
-            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors"))
-                .actionGet();
-            assertAcked(client.admin().indices().prepareDelete("vendors"));
-        }
-    }
-
-    @Before
-    public void setupEventsIndices() {
-        record Event(long timestamp, String user, String host) {}
-
-        List<Event> e0 = List.of(
-            new Event(1, "matthew", "192.168.1.3"),
-            new Event(2, "simon", "192.168.1.5"),
-            new Event(3, "park", "192.168.1.2"),
-            new Event(4, "andrew", "192.168.1.7"),
-            new Event(5, "simon", "192.168.1.20"),
-            new Event(6, "kevin", "192.168.1.2"),
-            new Event(7, "akio", "192.168.1.5"),
-            new Event(8, "luke", "192.168.1.2"),
-            new Event(9, "jack", "192.168.1.4")
-        );
-        List<Event> e1 = List.of(
-            new Event(1, "andres", "192.168.1.2"),
-            new Event(2, "sergio", "192.168.1.6"),
-            new Event(3, "kylian", "192.168.1.8"),
-            new Event(4, "andrew", "192.168.1.9"),
-            new Event(5, "jack", "192.168.1.3"),
-            new Event(6, "kevin", "192.168.1.4"),
-            new Event(7, "akio", "192.168.1.7"),
-            new Event(8, "kevin", "192.168.1.21"),
-            new Event(9, "andres", "192.168.1.8")
-        );
-        List<Event> e2 = List.of(
-            new Event(1, "park", "192.168.1.25"),
-            new Event(2, "akio", "192.168.1.5"),
-            new Event(3, "park", "192.168.1.2"),
-            new Event(4, "kevin", "192.168.1.3")
-        );
-        for (var c : Map.of(LOCAL_CLUSTER, e0, "c1", e1, "c2", e2).entrySet()) {
-            Client client = client(c.getKey());
-            client.admin()
-                .indices()
-                .prepareCreate("events")
-                .setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip")
-                .get();
-            for (var e : c.getValue()) {
-                client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get();
-            }
-            client.admin().indices().prepareRefresh("events").get();
-        }
-    }
-
     public void testEnrichWithHostsPolicyAndDisconnectedRemotesWithSkipUnavailableTrue() throws IOException {
         setSkipUnavailable(REMOTE_CLUSTER_1, true);
         setSkipUnavailable(REMOTE_CLUSTER_2, true);
@@ -645,19 +505,6 @@ public class CrossClusterEnrichUnavailableClustersIT extends AbstractMultiCluste
         }
     }
 
-    protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
-        EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
-        request.query(query);
-        request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
-        if (randomBoolean()) {
-            request.profile(true);
-        }
-        if (ccsMetadataInResponse != null) {
-            request.includeCCSMetadata(ccsMetadataInResponse);
-        }
-        return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
-    }
-
     private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
         assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
         assertTrue(executionInfo.isCrossClusterSearch());

+ 2 - 3
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryUnavailableRemotesIT.java

@@ -18,7 +18,6 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.AbstractMultiClustersTestCase;
 import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.xpack.esql.core.type.DataType;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -54,8 +53,8 @@ public class CrossClusterQueryUnavailableRemotesIT extends AbstractMultiClusters
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
-        plugins.add(EsqlPlugin.class);
-        plugins.add(org.elasticsearch.xpack.esql.action.CrossClustersQueryIT.InternalExchangePlugin.class);
+        plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
+        plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class);
         return plugins;
     }
 

+ 1 - 2
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersCancellationIT.java

@@ -33,7 +33,6 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.esql.plugin.ComputeService;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.junit.Before;
 
 import java.util.ArrayList;
@@ -62,7 +61,7 @@ public class CrossClustersCancellationIT extends AbstractMultiClustersTestCase {
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
-        plugins.add(EsqlPlugin.class);
+        plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
         plugins.add(InternalExchangePlugin.class);
         plugins.add(PauseFieldPlugin.class);
         return plugins;

+ 2 - 244
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java

@@ -7,218 +7,34 @@
 
 package org.elasticsearch.xpack.esql.action;
 
-import org.elasticsearch.action.ActionType;
-import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.TransportAction;
-import org.elasticsearch.client.internal.Client;
-import org.elasticsearch.client.internal.node.NodeClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.core.Tuple;
-import org.elasticsearch.ingest.common.IngestCommonPlugin;
-import org.elasticsearch.injection.guice.Inject;
-import org.elasticsearch.license.LicenseService;
-import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.protocol.xpack.XPackInfoRequest;
-import org.elasticsearch.protocol.xpack.XPackInfoResponse;
-import org.elasticsearch.reindex.ReindexPlugin;
-import org.elasticsearch.test.AbstractMultiClustersTestCase;
-import org.elasticsearch.transport.TransportService;
-import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
-import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
-import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
-import org.elasticsearch.xpack.core.action.XPackInfoFeatureResponse;
-import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
-import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
-import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
-import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
-import org.elasticsearch.xpack.enrich.EnrichPlugin;
-import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.plan.logical.Enrich;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
-import org.junit.After;
-import org.junit.Before;
 
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
-public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
-
-    @Override
-    protected Collection<String> remoteClusterAlias() {
-        return List.of("c1", "c2");
-    }
-
-    protected Collection<String> allClusters() {
-        return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER);
-    }
+public class CrossClustersEnrichIT extends AbstractEnrichBasedCrossClusterTestCase {
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
-        plugins.add(EsqlPlugin.class);
-        plugins.add(LocalStateEnrich.class);
-        plugins.add(IngestCommonPlugin.class);
-        plugins.add(ReindexPlugin.class);
+        plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
         return plugins;
     }
 
-    @Override
-    protected Settings nodeSettings() {
-        return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
-    }
-
-    static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os"));
-    static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor"));
-
-    @Before
-    public void setupHostsEnrich() {
-        // the hosts policy are identical on every node
-        Map<String, String> allHosts = Map.of(
-            "192.168.1.2",
-            "Windows",
-            "192.168.1.3",
-            "MacOS",
-            "192.168.1.4",
-            "Linux",
-            "192.168.1.5",
-            "Android",
-            "192.168.1.6",
-            "iOS",
-            "192.168.1.7",
-            "Windows",
-            "192.168.1.8",
-            "MacOS",
-            "192.168.1.9",
-            "Linux",
-            "192.168.1.10",
-            "Linux",
-            "192.168.1.11",
-            "Windows"
-        );
-        for (String cluster : allClusters()) {
-            Client client = client(cluster);
-            client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get();
-            for (Map.Entry<String, String> h : allHosts.entrySet()) {
-                client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get();
-            }
-            client.admin().indices().prepareRefresh("hosts").get();
-            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy))
-                .actionGet();
-            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts"))
-                .actionGet();
-            assertAcked(client.admin().indices().prepareDelete("hosts"));
-        }
-    }
-
-    @Before
-    public void setupVendorPolicy() {
-        var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat");
-        var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse");
-        var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu");
-        var vendors = Map.of(LOCAL_CLUSTER, localVendors, "c1", c1Vendors, "c2", c2Vendors);
-        for (Map.Entry<String, Map<String, String>> e : vendors.entrySet()) {
-            Client client = client(e.getKey());
-            client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get();
-            for (Map.Entry<String, String> v : e.getValue().entrySet()) {
-                client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get();
-            }
-            client.admin().indices().prepareRefresh("vendors").get();
-            client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy))
-                .actionGet();
-            client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors"))
-                .actionGet();
-            assertAcked(client.admin().indices().prepareDelete("vendors"));
-        }
-    }
-
-    @Before
-    public void setupEventsIndices() {
-        record Event(long timestamp, String user, String host) {
-
-        }
-        List<Event> e0 = List.of(
-            new Event(1, "matthew", "192.168.1.3"),
-            new Event(2, "simon", "192.168.1.5"),
-            new Event(3, "park", "192.168.1.2"),
-            new Event(4, "andrew", "192.168.1.7"),
-            new Event(5, "simon", "192.168.1.20"),
-            new Event(6, "kevin", "192.168.1.2"),
-            new Event(7, "akio", "192.168.1.5"),
-            new Event(8, "luke", "192.168.1.2"),
-            new Event(9, "jack", "192.168.1.4")
-        );
-        List<Event> e1 = List.of(
-            new Event(1, "andres", "192.168.1.2"),
-            new Event(2, "sergio", "192.168.1.6"),
-            new Event(3, "kylian", "192.168.1.8"),
-            new Event(4, "andrew", "192.168.1.9"),
-            new Event(5, "jack", "192.168.1.3"),
-            new Event(6, "kevin", "192.168.1.4"),
-            new Event(7, "akio", "192.168.1.7"),
-            new Event(8, "kevin", "192.168.1.21"),
-            new Event(9, "andres", "192.168.1.8")
-        );
-        List<Event> e2 = List.of(
-            new Event(1, "park", "192.168.1.25"),
-            new Event(2, "akio", "192.168.1.5"),
-            new Event(3, "park", "192.168.1.2"),
-            new Event(4, "kevin", "192.168.1.3")
-        );
-        for (var c : Map.of(LOCAL_CLUSTER, e0, "c1", e1, "c2", e2).entrySet()) {
-            Client client = client(c.getKey());
-            client.admin()
-                .indices()
-                .prepareCreate("events")
-                .setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip")
-                .get();
-            for (var e : c.getValue()) {
-                client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get();
-            }
-            client.admin().indices().prepareRefresh("events").get();
-        }
-    }
-
-    @After
-    public void wipeEnrichPolicies() {
-        for (String cluster : allClusters()) {
-            cluster(cluster).wipe(Set.of());
-            for (String policy : List.of("hosts", "vendors")) {
-                client(cluster).execute(
-                    DeleteEnrichPolicyAction.INSTANCE,
-                    new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
-                );
-            }
-        }
-    }
-
-    static String enrichHosts(Enrich.Mode mode) {
-        return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields());
-    }
-
-    static String enrichVendors(Enrich.Mode mode) {
-        return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields());
-    }
-
     public void testWithHostsPolicy() {
         for (var mode : Enrich.Mode.values()) {
             String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os";
@@ -606,19 +422,6 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
         );
     }
 
-    protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
-        EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
-        request.query(query);
-        request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
-        if (randomBoolean()) {
-            request.profile(true);
-        }
-        if (ccsMetadataInResponse != null) {
-            request.includeCCSMetadata(ccsMetadataInResponse);
-        }
-        return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
-    }
-
     private static void assertCCSExecutionInfoDetails(EsqlExecutionInfo executionInfo) {
         assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
         assertTrue(executionInfo.isCrossClusterSearch());
@@ -637,49 +440,4 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
             assertThat(cluster.getFailedShards(), equalTo(0));
         }
     }
-
-    public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
-        return switch (randomIntBetween(1, 3)) {
-            case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE);
-            case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE);
-            case 3 -> new Tuple<>(null, Boolean.FALSE);
-            default -> throw new AssertionError("should not get here");
-        };
-    }
-
-    public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
-
-        public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {
-            super(settings, configPath);
-
-            plugins.add(new EnrichPlugin(settings) {
-                @Override
-                protected XPackLicenseState getLicenseState() {
-                    return this.getLicenseState();
-                }
-            });
-        }
-
-        public static class EnrichTransportXPackInfoAction extends TransportXPackInfoAction {
-            @Inject
-            public EnrichTransportXPackInfoAction(
-                TransportService transportService,
-                ActionFilters actionFilters,
-                LicenseService licenseService,
-                NodeClient client
-            ) {
-                super(transportService, actionFilters, licenseService, client);
-            }
-
-            @Override
-            protected List<ActionType<XPackInfoFeatureResponse>> infoActions() {
-                return Collections.singletonList(XPackInfoFeatureAction.ENRICH);
-            }
-        }
-
-        @Override
-        protected Class<? extends TransportAction<XPackInfoRequest, XPackInfoResponse>> getInfoAction() {
-            return EnrichTransportXPackInfoAction.class;
-        }
-    }
 }

+ 203 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueriesWithInvalidLicenseIT.java

@@ -0,0 +1,203 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.xpack.esql.plan.logical.Enrich;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+public class CrossClustersQueriesWithInvalidLicenseIT extends AbstractEnrichBasedCrossClusterTestCase {
+
+    private static final String LICENSE_ERROR_MESSAGE = "A valid Enterprise license is required to run ES|QL cross-cluster searches.";
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
+        plugins.add(EsqlPluginWithNonEnterpriseOrExpiredLicense.class);  // key plugin for the test
+        return plugins;
+    }
+
+    public void testBasicCrossClusterQuery() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        ElasticsearchStatusException e = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> runQuery("FROM *,*:* | LIMIT 5", requestIncludeMeta)
+        );
+        assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE));
+    }
+
+    public void testMetadataCrossClusterQuery() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        ElasticsearchStatusException e = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> runQuery("FROM events,*:* METADATA _index | SORT _index", requestIncludeMeta)
+        );
+        assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE));
+    }
+
+    public void testQueryAgainstNonMatchingClusterWildcardPattern() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
+        // since this wildcarded expression does not resolve to a valid remote cluster, it is not considered
+        // a cross-cluster search and thus should not throw a license error
+        String q = "FROM xremote*:events";
+        {
+            String limit1 = q + " | STATS count(*)";
+            try (EsqlQueryResponse resp = runQuery(limit1, requestIncludeMeta)) {
+                assertThat(resp.columns().size(), equalTo(1));
+                EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.isCrossClusterSearch(), is(false));
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+            }
+
+            String limit0 = q + " | LIMIT 0";
+            try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) {
+                assertThat(resp.columns().size(), equalTo(1));
+                assertThat(getValuesList(resp).size(), equalTo(0));
+                EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+                assertThat(executionInfo.isCrossClusterSearch(), is(false));
+                assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+            }
+        }
+    }
+
+    public void testCCSWithLimit0() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+
+        // local only query does not need a valid Enterprise or Trial license
+        try (EsqlQueryResponse resp = runQuery("FROM events | LIMIT 0", requestIncludeMeta)) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.isCrossClusterSearch(), is(false));
+            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+        }
+
+        // cross-cluster searches should fail with license error
+        String q = randomFrom("FROM events,c1:* | LIMIT 0", "FROM c1:* | LIMIT 0");
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> runQuery(q, requestIncludeMeta));
+        assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE));
+    }
+
+    public void testSearchesWhereNonExistentClusterIsSpecified() {
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+        boolean responseExpectMeta = includeCCSMetadata.v2();
+
+        // this one query should be allowed since x* does not resolve to any known remote cluster
+        try (EsqlQueryResponse resp = runQuery("FROM events,x*:no_such_index* | STATS count(*)", requestIncludeMeta)) {
+            EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
+            List<List<Object>> values = getValuesList(resp);
+            assertThat(values, hasSize(1));
+
+            assertNotNull(executionInfo);
+            assertThat(executionInfo.clusterAliases(), equalTo(Set.of(LOCAL_CLUSTER)));
+            assertThat(executionInfo.isCrossClusterSearch(), is(false));
+            assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
+            // since this not a CCS, only the overall took time in the EsqlExecutionInfo matters
+            assertThat(executionInfo.overallTook().millis(), greaterThanOrEqualTo(0L));
+        }
+
+        ElasticsearchStatusException e = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> runQuery("FROM events,no_such_cluster:no_such_index* | STATS count(*)", requestIncludeMeta)
+        );
+        // with a valid license this would throw "no such remote cluster" exception, but without a valid license, should get a license error
+        assertThat(e.getMessage(), containsString(LICENSE_ERROR_MESSAGE));
+    }
+
+    public void testEnrichWithHostsPolicy() {
+        // local-only queries do not need an Enterprise or Trial license
+        for (var mode : Enrich.Mode.values()) {
+            String query = "FROM events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os";
+            try (EsqlQueryResponse resp = runQuery(query, null)) {
+                List<List<Object>> rows = getValuesList(resp);
+                assertThat(
+                    rows,
+                    equalTo(
+                        List.of(
+                            List.of(2L, "Android"),
+                            List.of(1L, "Linux"),
+                            List.of(1L, "MacOS"),
+                            List.of(4L, "Windows"),
+                            Arrays.asList(1L, (String) null)
+                        )
+                    )
+                );
+                assertFalse(resp.getExecutionInfo().isCrossClusterSearch());
+            }
+        }
+
+        // cross-cluster query should fail due to not having valid Enterprise or Trial license
+        Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
+        Boolean requestIncludeMeta = includeCCSMetadata.v1();
+
+        for (var mode : Enrich.Mode.values()) {
+            String query = "FROM *:events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os";
+            ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, requestIncludeMeta));
+            assertThat(e.getMessage(), containsString("A valid Enterprise license is required to run ES|QL cross-cluster searches."));
+        }
+
+        for (var mode : Enrich.Mode.values()) {
+            String query = "FROM *:events,events | eval ip= TO_STR(host) | " + enrichHosts(mode) + " | stats c = COUNT(*) by os | SORT os";
+            ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, requestIncludeMeta));
+            assertThat(e.getMessage(), containsString("A valid Enterprise license is required to run ES|QL cross-cluster searches."));
+        }
+    }
+
+    public void testAggThenEnrichRemote() {
+        String query = String.format(Locale.ROOT, """
+            FROM *:events,events
+            | eval ip= TO_STR(host)
+            | %s
+            | stats c = COUNT(*) by os
+            | %s
+            | sort vendor
+            """, enrichHosts(Enrich.Mode.ANY), enrichVendors(Enrich.Mode.REMOTE));
+        var error = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, randomBoolean()).close());
+        // with a valid license this would fail with "ENRICH with remote policy can't be executed after STATS", so ensure here
+        // that the license error is detected first and returned rather than a VerificationException
+        assertThat(error.getMessage(), containsString(LICENSE_ERROR_MESSAGE));
+    }
+
+    public void testEnrichCoordinatorThenEnrichRemote() {
+        String query = String.format(Locale.ROOT, """
+            FROM *:events,events
+            | eval ip= TO_STR(host)
+            | %s
+            | %s
+            | sort vendor
+            """, enrichHosts(Enrich.Mode.COORDINATOR), enrichVendors(Enrich.Mode.REMOTE));
+        var error = expectThrows(ElasticsearchStatusException.class, () -> runQuery(query, randomBoolean()).close());
+        assertThat(
+            error.getMessage(),
+            // with a valid license the error is "ENRICH with remote policy can't be executed after another ENRICH with coordinator policy",
+            // so ensure here that the license error is detected first and returned rather than a VerificationException
+            containsString(LICENSE_ERROR_MESSAGE)
+        );
+    }
+}

+ 8 - 9
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java

@@ -32,7 +32,6 @@ import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.esql.VerificationException;
-import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
 import java.io.IOException;
@@ -73,13 +72,13 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
 
     @Override
     protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
-        return Map.of(REMOTE_CLUSTER_1, randomBoolean());
+        return Map.of(REMOTE_CLUSTER_1, randomBoolean(), REMOTE_CLUSTER_2, randomBoolean());
     }
 
     @Override
     protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
         List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
-        plugins.add(EsqlPlugin.class);
+        plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
         plugins.add(InternalExchangePlugin.class);
         return plugins;
     }
@@ -184,7 +183,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
     }
 
     public void testSearchesAgainstNonMatchingIndicesWithLocalOnly() {
-        Map<String, Object> testClusterInfo = setupClusters(2);
+        Map<String, Object> testClusterInfo = setupTwoClusters();
         String localIndex = (String) testClusterInfo.get("local.index");
 
         {
@@ -905,7 +904,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         // cluster-foo* matches nothing and so should not be present in the EsqlExecutionInfo
         try (
             EsqlQueryResponse resp = runQuery(
-                "from logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | stats sum (v)",
+                "FROM logs-*,no_such_index*,cluster-a:no_such_index*,cluster-foo*:* | STATS sum (v)",
                 requestIncludeMeta
             )
         ) {
@@ -1009,7 +1008,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
 
         try (
             EsqlQueryResponse resp = runQuery(
-                "FROM logs*,*:logs* METADATA _index | stats sum(v) by _index | sort _index",
+                Strings.format("FROM logs*,%s:logs* METADATA _index | stats sum(v) by _index | sort _index", REMOTE_CLUSTER_1),
                 requestIncludeMeta
             )
         ) {
@@ -1091,7 +1090,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         final int remoteOnlyProfiles;
         {
             EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
-            request.query("FROM *:logs* | stats sum(v)");
+            request.query("FROM c*:logs* | stats sum(v)");
             request.pragmas(pragmas);
             request.profile(true);
             try (EsqlQueryResponse resp = runQuery(request)) {
@@ -1124,7 +1123,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         final int allProfiles;
         {
             EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
-            request.query("FROM logs*,*:logs* | stats total = sum(v)");
+            request.query("FROM logs*,c*:logs* | stats total = sum(v)");
             request.pragmas(pragmas);
             request.profile(true);
             try (EsqlQueryResponse resp = runQuery(request)) {
@@ -1169,7 +1168,7 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase {
         int remoteNumShards = (Integer) testClusterInfo.get("remote.num_shards");
 
         EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
-        request.query("FROM logs*,*:logs* | EVAL ip = to_ip(id) | STATS total = sum(v) by ip | LIMIT 10");
+        request.query("FROM logs*,c*:logs* | EVAL ip = to_ip(id) | STATS total = sum(v) by ip | LIMIT 10");
         InternalTestCluster cluster = cluster(LOCAL_CLUSTER);
         String node = randomFrom(cluster.getNodeNames());
         CountDownLatch latch = new CountDownLatch(1);

+ 26 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithEnterpriseOrTrialLicense.java

@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.license.License;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.license.internal.XPackLicenseStatus;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
+
+import static org.elasticsearch.test.ESTestCase.randomFrom;
+
+/**
+ * In IT tests, use this instead of the EsqlPlugin in order to use ES|QL features
+ * that require an Enteprise (or Trial) license.
+ */
+public class EsqlPluginWithEnterpriseOrTrialLicense extends EsqlPlugin {
+    protected XPackLicenseState getLicenseState() {
+        License.OperationMode operationMode = randomFrom(License.OperationMode.ENTERPRISE, License.OperationMode.TRIAL);
+        return new XPackLicenseState(() -> System.currentTimeMillis(), new XPackLicenseStatus(operationMode, true, "Test license expired"));
+    }
+}

+ 47 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlPluginWithNonEnterpriseOrExpiredLicense.java

@@ -0,0 +1,47 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.license.License;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.license.internal.XPackLicenseStatus;
+import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
+
+import static org.elasticsearch.test.ESTestCase.randomBoolean;
+import static org.elasticsearch.test.ESTestCase.randomFrom;
+
+/**
+ * In IT tests, use this instead of the EsqlPlugin in order to test ES|QL features
+ * using either a:
+ *  - an active (non-expired) basic, standard, missing, gold or platinum Elasticsearch license, OR
+ *  - an expired enterprise or trial license
+ */
+public class EsqlPluginWithNonEnterpriseOrExpiredLicense extends EsqlPlugin {
+    protected XPackLicenseState getLicenseState() {
+        License.OperationMode operationMode;
+        boolean active;
+        if (randomBoolean()) {
+            operationMode = randomFrom(
+                License.OperationMode.PLATINUM,
+                License.OperationMode.GOLD,
+                License.OperationMode.BASIC,
+                License.OperationMode.MISSING,
+                License.OperationMode.STANDARD
+            );
+            active = true;
+        } else {
+            operationMode = randomFrom(License.OperationMode.ENTERPRISE, License.OperationMode.TRIAL);
+            active = false;  // expired
+        }
+
+        return new XPackLicenseState(
+            () -> System.currentTimeMillis(),
+            new XPackLicenseStatus(operationMode, active, "Test license expired")
+        );
+    }
+}

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

@@ -610,6 +610,10 @@ public class Verifier {
         functions.forEach(f -> metrics.incFunctionMetric(f));
     }
 
+    public XPackLicenseState licenseState() {
+        return licenseState;
+    }
+
     /**
      * Limit QL's comparisons to types we support.  This should agree with
      * {@link EsqlBinaryComparison}'s checkCompatibility method

+ 51 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlLicenseChecker.java

@@ -0,0 +1,51 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.session;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.license.License;
+import org.elasticsearch.license.LicensedFeature;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.rest.RestStatus;
+
+public class EsqlLicenseChecker {
+
+    public static final LicensedFeature.Momentary CCS_FEATURE = LicensedFeature.momentary(
+        null,
+        "esql-ccs",
+        License.OperationMode.ENTERPRISE
+    );
+
+    /**
+     * Only call this method once you know the user is doing a cross-cluster query, as it will update
+     * the license_usage timestamp for the esql-ccs feature if the license is Enterprise (or Trial).
+     * @param licenseState
+     * @return true if the user has a license that allows ESQL CCS.
+     */
+    public static boolean isCcsAllowed(XPackLicenseState licenseState) {
+        if (licenseState == null) {
+            return false;
+        }
+        return CCS_FEATURE.check(licenseState);
+    }
+
+    /**
+     * @param licenseState existing license state. Need to extract info on the current installed license.
+     * @return ElasticsearchStatusException with an error message informing the caller what license is needed
+     * to run ES|QL cross-cluster searches and what license (if any) was found.
+     */
+    public static ElasticsearchStatusException invalidLicenseForCcsException(XPackLicenseState licenseState) {
+        String message = "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: ";
+        if (licenseState == null) {
+            message += "none";
+        } else {
+            message += licenseState.statusDescription();
+        }
+        return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
+    }
+}

+ 3 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -298,6 +298,9 @@ public class EsqlSession {
             .map(e -> new EnrichPolicyResolver.UnresolvedPolicy((String) e.policyName().fold(), e.mode()))
             .collect(Collectors.toSet());
         final List<TableInfo> indices = preAnalysis.indices;
+
+        EsqlSessionCCSUtils.checkForCcsLicense(indices, indicesExpressionGrouper, verifier.licenseState());
+
         // TODO: make a separate call for lookup indices
         final Set<String> targetClusters = enrichPolicyResolver.groupIndicesPerCluster(
             indices.stream().flatMap(t -> Arrays.stream(Strings.commaDelimitedListToStringArray(t.id().index()))).toArray(String[]::new)

+ 43 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java

@@ -9,17 +9,24 @@ package org.elasticsearch.xpack.esql.session;
 
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.util.set.Sets;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.indices.IndicesExpressionGrouper;
+import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.NoSuchRemoteClusterException;
 import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
 import org.elasticsearch.xpack.esql.analysis.Analyzer;
+import org.elasticsearch.xpack.esql.analysis.TableInfo;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
 import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
 
@@ -255,6 +262,9 @@ class EsqlSessionCCSUtils {
     }
 
     private static boolean concreteIndexRequested(String indexExpression) {
+        if (Strings.isNullOrBlank(indexExpression)) {
+            return false;
+        }
         for (String expr : indexExpression.split(",")) {
             if (expr.charAt(0) == '<' || expr.startsWith("-<")) {
                 // skip date math expressions
@@ -288,4 +298,37 @@ class EsqlSessionCCSUtils {
             }
         }
     }
+
+    /**
+     * Checks the index expression for the presence of remote clusters. If found, it will ensure that the caller
+     * has a valid Enterprise (or Trial) license on the querying cluster.
+     * @param indices index expression requested by user
+     * @param indicesGrouper grouper of index expressions by cluster alias
+     * @param licenseState license state on the querying cluster
+     * @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search.
+     */
+    public static void checkForCcsLicense(
+        List<TableInfo> indices,
+        IndicesExpressionGrouper indicesGrouper,
+        XPackLicenseState licenseState
+    ) {
+        for (TableInfo tableInfo : indices) {
+            Map<String, OriginalIndices> groupedIndices;
+            try {
+                groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, tableInfo.id().index());
+            } catch (NoSuchRemoteClusterException e) {
+                if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
+                    throw e;
+                } else {
+                    throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
+                }
+            }
+            // check if it is a cross-cluster query
+            if (groupedIndices.size() > 1 || groupedIndices.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false) {
+                if (EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
+                    throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
+                }
+            }
+        }
+    }
 }

+ 158 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java

@@ -8,10 +8,18 @@
 package org.elasticsearch.xpack.esql.session;
 
 import org.apache.lucene.index.CorruptIndexException;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.index.IndexMode;
+import org.elasticsearch.indices.IndicesExpressionGrouper;
+import org.elasticsearch.license.License;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.license.internal.XPackLicenseStatus;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.NoSeedNodeLeftException;
@@ -20,9 +28,11 @@ import org.elasticsearch.transport.RemoteClusterAware;
 import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
+import org.elasticsearch.xpack.esql.analysis.TableInfo;
 import org.elasticsearch.xpack.esql.core.type.EsField;
 import org.elasticsearch.xpack.esql.index.EsIndex;
 import org.elasticsearch.xpack.esql.index.IndexResolution;
+import org.elasticsearch.xpack.esql.plan.TableIdentifier;
 import org.elasticsearch.xpack.esql.type.EsFieldTests;
 
 import java.util.ArrayList;
@@ -32,8 +42,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.LongSupplier;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
+import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
+import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.checkForCcsLicense;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -627,4 +641,148 @@ public class EsqlSessionCCSUtilsTests extends ESTestCase {
         }
 
     }
+
+    public void testCheckForCcsLicense() {
+        final TestIndicesExpressionGrouper indicesGrouper = new TestIndicesExpressionGrouper();
+
+        // this seems to be used only for tracking usage of features, not for checking if a license is expired
+        final LongSupplier currTime = () -> System.currentTimeMillis();
+
+        XPackLicenseState enterpriseLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.ENTERPRISE));
+        XPackLicenseState trialLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.TRIAL));
+        XPackLicenseState platinumLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.PLATINUM));
+        XPackLicenseState goldLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.GOLD));
+        XPackLicenseState basicLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.BASIC));
+        XPackLicenseState standardLicenseValid = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.STANDARD));
+        XPackLicenseState missingLicense = new XPackLicenseState(currTime, activeLicenseStatus(License.OperationMode.MISSING));
+        XPackLicenseState nullLicense = null;
+
+        final XPackLicenseStatus enterpriseStatus = inactiveLicenseStatus(License.OperationMode.ENTERPRISE);
+        XPackLicenseState enterpriseLicenseInactive = new XPackLicenseState(currTime, enterpriseStatus);
+        XPackLicenseState trialLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.TRIAL));
+        XPackLicenseState platinumLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.PLATINUM));
+        XPackLicenseState goldLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.GOLD));
+        XPackLicenseState basicLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.BASIC));
+        XPackLicenseState standardLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.STANDARD));
+        XPackLicenseState missingLicenseInactive = new XPackLicenseState(currTime, inactiveLicenseStatus(License.OperationMode.MISSING));
+
+        // local only search does not require an enterprise license
+        {
+            List<TableInfo> indices = new ArrayList<>();
+            indices.add(new TableInfo(new TableIdentifier(EMPTY, null, randomFrom("idx", "idx1,idx2*"))));
+
+            checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid);
+            checkForCcsLicense(indices, indicesGrouper, platinumLicenseValid);
+            checkForCcsLicense(indices, indicesGrouper, goldLicenseValid);
+            checkForCcsLicense(indices, indicesGrouper, trialLicenseValid);
+            checkForCcsLicense(indices, indicesGrouper, basicLicenseValid);
+            checkForCcsLicense(indices, indicesGrouper, standardLicenseValid);
+            checkForCcsLicense(indices, indicesGrouper, missingLicense);
+            checkForCcsLicense(indices, indicesGrouper, nullLicense);
+
+            checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseInactive);
+            checkForCcsLicense(indices, indicesGrouper, platinumLicenseInactive);
+            checkForCcsLicense(indices, indicesGrouper, goldLicenseInactive);
+            checkForCcsLicense(indices, indicesGrouper, trialLicenseInactive);
+            checkForCcsLicense(indices, indicesGrouper, basicLicenseInactive);
+            checkForCcsLicense(indices, indicesGrouper, standardLicenseInactive);
+            checkForCcsLicense(indices, indicesGrouper, missingLicenseInactive);
+        }
+
+        // cross-cluster search requires a valid (active, non-expired) enterprise license OR a valid trial license
+        {
+            List<TableInfo> indices = new ArrayList<>();
+            final String indexExprWithRemotes = randomFrom("remote:idx", "idx1,remote:idx2*,remote:logs,c*:idx4");
+            if (randomBoolean()) {
+                indices.add(new TableInfo(new TableIdentifier(EMPTY, null, indexExprWithRemotes)));
+            } else {
+                indices.add(new TableInfo(new TableIdentifier(EMPTY, null, randomFrom("idx", "idx1,idx2*"))));
+                indices.add(new TableInfo(new TableIdentifier(EMPTY, null, indexExprWithRemotes)));
+            }
+
+            // licenses that work
+            checkForCcsLicense(indices, indicesGrouper, enterpriseLicenseValid);
+            checkForCcsLicense(indices, indicesGrouper, trialLicenseValid);
+
+            // all others fail ---
+
+            // active non-expired non-Enterprise non-Trial licenses
+            assertLicenseCheckFails(indices, indicesGrouper, platinumLicenseValid, "active platinum license");
+            assertLicenseCheckFails(indices, indicesGrouper, goldLicenseValid, "active gold license");
+            assertLicenseCheckFails(indices, indicesGrouper, basicLicenseValid, "active basic license");
+            assertLicenseCheckFails(indices, indicesGrouper, standardLicenseValid, "active standard license");
+            assertLicenseCheckFails(indices, indicesGrouper, missingLicense, "active missing license");
+            assertLicenseCheckFails(indices, indicesGrouper, nullLicense, "none");
+
+            // inactive/expired licenses
+            assertLicenseCheckFails(indices, indicesGrouper, enterpriseLicenseInactive, "expired enterprise license");
+            assertLicenseCheckFails(indices, indicesGrouper, trialLicenseInactive, "expired trial license");
+            assertLicenseCheckFails(indices, indicesGrouper, platinumLicenseInactive, "expired platinum license");
+            assertLicenseCheckFails(indices, indicesGrouper, goldLicenseInactive, "expired gold license");
+            assertLicenseCheckFails(indices, indicesGrouper, basicLicenseInactive, "expired basic license");
+            assertLicenseCheckFails(indices, indicesGrouper, standardLicenseInactive, "expired standard license");
+            assertLicenseCheckFails(indices, indicesGrouper, missingLicenseInactive, "expired missing license");
+        }
+    }
+
+    private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) {
+        return new XPackLicenseStatus(operationMode, true, null);
+    }
+
+    private XPackLicenseStatus inactiveLicenseStatus(License.OperationMode operationMode) {
+        return new XPackLicenseStatus(operationMode, false, "License Expired 123");
+    }
+
+    private void assertLicenseCheckFails(
+        List<TableInfo> indices,
+        TestIndicesExpressionGrouper indicesGrouper,
+        XPackLicenseState licenseState,
+        String expectedErrorMessageSuffix
+    ) {
+        ElasticsearchStatusException e = expectThrows(
+            ElasticsearchStatusException.class,
+            () -> checkForCcsLicense(indices, indicesGrouper, licenseState)
+        );
+        assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
+        assertThat(
+            e.getMessage(),
+            equalTo(
+                "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: " + expectedErrorMessageSuffix
+            )
+        );
+    }
+
+    static class TestIndicesExpressionGrouper implements IndicesExpressionGrouper {
+        @Override
+        public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indexExpressions) {
+            final Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
+            final String localKey = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY;
+
+            for (String expr : indexExpressions) {
+                assertFalse(Strings.isNullOrBlank(expr));
+                String[] split = expr.split(":", 2);
+                assertTrue("Bad index expression: " + expr, split.length < 3);
+                String clusterAlias;
+                String indexExpr;
+                if (split.length == 1) {
+                    clusterAlias = localKey;
+                    indexExpr = expr;
+                } else {
+                    clusterAlias = split[0];
+                    indexExpr = split[1];
+
+                }
+                OriginalIndices currIndices = originalIndicesMap.get(clusterAlias);
+                if (currIndices == null) {
+                    originalIndicesMap.put(clusterAlias, new OriginalIndices(new String[] { indexExpr }, indicesOptions));
+                } else {
+                    List<String> indicesList = Arrays.stream(currIndices.indices()).collect(Collectors.toList());
+                    indicesList.add(indexExpr);
+                    originalIndicesMap.put(clusterAlias, new OriginalIndices(indicesList.toArray(new String[0]), indicesOptions));
+                }
+            }
+            return originalIndicesMap;
+        }
+    }
+
 }

+ 35 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.remotecluster;
 
+import org.apache.http.client.methods.HttpGet;
 import org.elasticsearch.Build;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
@@ -22,6 +23,7 @@ import org.elasticsearch.core.Tuple;
 import org.elasticsearch.test.cluster.ElasticsearchCluster;
 import org.elasticsearch.test.cluster.util.resource.Resource;
 import org.elasticsearch.test.junit.RunnableTestRuleAdapter;
+import org.elasticsearch.test.rest.ObjectPath;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
 import org.junit.After;
@@ -34,6 +36,7 @@ import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -51,6 +54,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
 
 public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTestCase {
     private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
@@ -342,6 +346,14 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
         configureRemoteCluster();
         populateData();
 
+        Map<String, Object> esqlCcsLicenseFeatureUsage = fetchEsqlCcsFeatureUsageFromNode(client());
+
+        Object ccsLastUsedTimestampAtStartOfTest = null;
+        if (esqlCcsLicenseFeatureUsage.isEmpty() == false) {
+            // some test runs will have a usage value already, so capture that to compare at end of test
+            ccsLastUsedTimestampAtStartOfTest = esqlCcsLicenseFeatureUsage.get("last_used");
+        }
+
         // query remote cluster only
         Request request = esqlRequest("""
             FROM my_remote_cluster:employees
@@ -385,6 +397,15 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
             | LIMIT 2
             | KEEP emp_id, department"""));
         assertRemoteOnlyAgainst2IndexResults(response);
+
+        // check that the esql-ccs license feature is now present and that the last_used field has been updated
+        esqlCcsLicenseFeatureUsage = fetchEsqlCcsFeatureUsageFromNode(client());
+        assertThat(esqlCcsLicenseFeatureUsage.size(), equalTo(5));
+        Object lastUsed = esqlCcsLicenseFeatureUsage.get("last_used");
+        assertNotNull("lastUsed should not be null", lastUsed);
+        if (ccsLastUsedTimestampAtStartOfTest != null) {
+            assertThat(lastUsed.toString(), not(equalTo(ccsLastUsedTimestampAtStartOfTest.toString())));
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -1660,4 +1681,18 @@ public class RemoteClusterSecurityEsqlIT extends AbstractRemoteClusterSecurityTe
             assertThat((int) shards.get("failed"), is(0));
         }
     }
+
+    private static Map<String, Object> fetchEsqlCcsFeatureUsageFromNode(RestClient client) throws IOException {
+        Request request = new Request(HttpGet.METHOD_NAME, "_license/feature_usage");
+        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(USER, PASS)));
+        Response response = client.performRequest(request);
+        ObjectPath path = ObjectPath.createFromResponse(response);
+        List<Map<String, Object>> features = path.evaluate("features");
+        for (var feature : features) {
+            if ("esql-ccs".equals(feature.get("name"))) {
+                return feature;
+            }
+        }
+        return Collections.emptyMap();
+    }
 }

+ 11 - 52
x-pack/qa/multi-cluster-search-security/legacy-with-basic-license/src/test/resources/rest-api-spec/test/querying_cluster/80_esql.yml

@@ -86,11 +86,12 @@ teardown:
         ignore: 404
 
 ---
-"Index data and search on the mixed cluster":
+"ES|QL cross-cluster query fails with basic license":
   - skip:
       features: allowed_warnings
 
   - do:
+      catch: bad_request
       allowed_warnings:
         - "Line 1:21: Square brackets '[]' need to be removed in FROM METADATA declaration"
       headers: { Authorization: "Basic am9lOnMza3JpdC1wYXNzd29yZA==" }
@@ -98,23 +99,11 @@ teardown:
         body:
           query: 'FROM *:esql*,esql_* | STATS total = sum(cost) by tag | SORT tag | LIMIT 10'
 
-  - match: {columns.0.name: "total"}
-  - match: {columns.0.type: "long"}
-  - match: {columns.1.name: "tag"}
-  - match: {columns.1.type: "keyword"}
-
-  - match: {values.0.0: 2200}
-  - match: {values.0.1: "computer"}
-  - match: {values.1.0: 170}
-  - match: {values.1.1: "headphone"}
-  - match: {values.2.0: 2100 }
-  - match: {values.2.1: "laptop" }
-  - match: {values.3.0: 1000 }
-  - match: {values.3.1: "monitor" }
-  - match: {values.4.0: 550 }
-  - match: {values.4.1: "tablet" }
+  - match: { error.type: "status_exception" }
+  - match: { error.reason: "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: active basic license" }
 
   - do:
+      catch: bad_request
       allowed_warnings:
         - "Line 1:21: Square brackets '[]' need to be removed in FROM METADATA declaration"
       headers: { Authorization: "Basic am9lOnMza3JpdC1wYXNzd29yZA==" }
@@ -128,28 +117,11 @@ teardown:
                 lte: "2023-01-03"
                 format: "yyyy-MM-dd"
 
-  - match: {columns.0.name: "_index"}
-  - match: {columns.0.type: "keyword"}
-  - match: {columns.1.name: "tag"}
-  - match: {columns.1.type: "keyword"}
-  - match: {columns.2.name: "cost" }
-  - match: {columns.2.type: "long" }
-
-  - match: {values.0.0: "esql_local"}
-  - match: {values.0.1: "monitor"}
-  - match: {values.0.2: 250 }
-  - match: {values.1.0: "my_remote_cluster:esql_index" }
-  - match: {values.1.1: "tablet"}
-  - match: {values.1.2: 450 }
-  - match: {values.2.0: "my_remote_cluster:esql_index" }
-  - match: {values.2.1: "computer" }
-  - match: {values.2.2: 1200 }
-  - match: {values.3.0: "esql_local"}
-  - match: {values.3.1: "laptop" }
-  - match: {values.3.2: 2100 }
+  - match: { error.type: "status_exception" }
+  - match: { error.reason: "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: active basic license" }
 
 ---
-"Enrich across clusters":
+"ES|QL enrich query across clusters fails with basic license":
   - requires:
       cluster_features: ["gte_v8.13.0"]
       reason: "Enrich across clusters available in 8.13 or later"
@@ -194,27 +166,14 @@ teardown:
         index: suggestions
 
   - do:
+      catch: bad_request
       headers: { Authorization: "Basic am9lOnMza3JpdC1wYXNzd29yZA==" }
       esql.query:
         body:
           query: 'FROM *:esql*,esql_* | STATS total = sum(cost) by tag | SORT total DESC | LIMIT 3 | ENRICH suggestions | KEEP tag, total, phrase'
 
-  - match: {columns.0.name: "tag"}
-  - match: {columns.0.type: "keyword"}
-  - match: {columns.1.name: "total" }
-  - match: {columns.1.type: "long" }
-  - match: {columns.2.name: "phrase" }
-  - match: {columns.2.type: "keyword" }
-
-  - match: {values.0.0: "computer"}
-  - match: {values.0.1: 2200}
-  - match: {values.0.2: "best desktop for programming"}
-  - match: {values.1.0: "laptop"}
-  - match: {values.1.1: 2100 }
-  - match: {values.1.2: "the best battery life laptop"}
-  - match: {values.2.0: "monitor" }
-  - match: {values.2.1: 1000 }
-  - match: {values.2.2: "4k or 5k or 6K monitor?" }
+  - match: { error.type: "status_exception" }
+  - match: { error.reason: "A valid Enterprise license is required to run ES|QL cross-cluster searches. License found: active basic license" }
 
   - do:
       enrich.delete_policy: