Browse Source

EQL: Add the ability for EQL to perform CCSes (#74399)

This introduces the ability for EQL to perform searches on a remote cluster, leveraging ES's CCS capabilities.

The remote cluster needs to be on the same version as the local cluster.
Bogdan Pintea 4 years ago
parent
commit
398ebef2c1
36 changed files with 605 additions and 91 deletions
  1. 11 27
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/BaseEqlSpecTestCase.java
  2. 7 1
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlDateNanosSpecTestCase.java
  3. 7 1
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlExtraSpecTestCase.java
  4. 13 11
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlRestTestCase.java
  5. 11 9
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlRestValidationTestCase.java
  6. 7 1
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlSpecTestCase.java
  7. 133 0
      x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/RemoteClusterAwareEqlRestTestCase.java
  8. 53 0
      x-pack/plugin/eql/qa/multi-cluster-with-security/build.gradle
  9. 20 0
      x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlDateNanosIT.java
  10. 20 0
      x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlExtraIT.java
  11. 20 0
      x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java
  12. 41 0
      x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlRestValidationIT.java
  13. 20 0
      x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlSpecIT.java
  14. 25 0
      x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/RemoteClusterTestUtils.java
  15. 7 0
      x-pack/plugin/eql/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/eql/EqlRestValidationIT.java
  16. 1 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java
  17. 18 9
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java
  18. 3 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java
  19. 3 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java
  20. 3 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java
  21. 2 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java
  22. 1 2
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java
  23. 4 1
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java
  24. 6 3
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java
  25. 48 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/util/RemoteClusterRegistry.java
  26. 19 0
      x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/util/SearchHitUtils.java
  27. 5 2
      x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java
  28. 33 6
      x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/CancellationTests.java
  29. 36 0
      x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/VerifierTests.java
  30. 1 2
      x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/optimizer/TomlFoldTests.java
  31. 1 0
      x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/IndexResolver.java
  32. 9 1
      x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/session/Configuration.java
  33. 4 3
      x-pack/plugin/ql/test-fixtures/src/main/java/org/elasticsearch/xpack/ql/TestUtils.java
  34. 2 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java
  35. 2 1
      x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlConfiguration.java
  36. 9 5
      x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/CancellationTests.java

+ 11 - 27
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/BaseEqlSpecTestCase.java

@@ -14,7 +14,6 @@ import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.eql.EqlSearchRequest;
 import org.elasticsearch.client.eql.EqlSearchResponse;
@@ -24,21 +23,18 @@ import org.elasticsearch.client.eql.EqlSearchResponse.Sequence;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.core.TimeValue;
-import org.elasticsearch.test.rest.ESRestTestCase;
 import org.junit.AfterClass;
 import org.junit.Before;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.StringJoiner;
 
 import static java.util.stream.Collectors.toList;
 
-public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
+public abstract class BaseEqlSpecTestCase extends RemoteClusterAwareEqlRestTestCase {
 
     protected static final String PARAM_FORMATTING = "%2$s";
 
@@ -51,15 +47,16 @@ public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
 
     @Before
     public void setup() throws Exception {
-        if (client().performRequest(new Request("HEAD", "/" + index)).getStatusLine().getStatusCode() == 404) {
-            DataLoader.loadDatasetIntoEs(highLevelClient(), this::createParser);
+        RestClient provisioningClient = provisioningClient();
+        if (provisioningClient.performRequest(new Request("HEAD", "/" + unqualifiedIndexName())).getStatusLine().getStatusCode() == 404) {
+            DataLoader.loadDatasetIntoEs(highLevelClient(provisioningClient), this::createParser);
         }
     }
 
     @AfterClass
     public static void wipeTestData() throws IOException {
         try {
-            adminClient().performRequest(new Request("DELETE", "/*"));
+            provisioningAdminClient().performRequest(new Request("DELETE", "/*"));
         } catch (ResponseException e) {
             // 404 here just means we had no indexes
             if (e.getResponse().getStatusLine().getStatusCode() != 404) {
@@ -144,12 +141,7 @@ public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
 
     private RestHighLevelClient highLevelClient() {
         if (highLevelClient == null) {
-            highLevelClient = new RestHighLevelClient(
-                    client(),
-                    ignore -> {
-                    },
-                    Collections.emptyList()) {
-            };
+            highLevelClient = highLevelClient(client());
         }
         return highLevelClient;
     }
@@ -204,17 +196,7 @@ public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
 
     @Override
     protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
-        RestClientBuilder builder = RestClient.builder(hosts);
-        configureClient(builder, settings);
-
-        int timeout = Math.toIntExact(timeout().millis());
-        builder.setRequestConfigCallback(
-            requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(timeout)
-                .setConnectionRequestTimeout(timeout)
-                .setSocketTimeout(timeout)
-        );
-        builder.setStrictDeprecationMode(true);
-        return builder.build();
+        return clientBuilder(settings, hosts);
     }
 
     protected String timestamp() {
@@ -240,7 +222,9 @@ public abstract class BaseEqlSpecTestCase extends ESRestTestCase {
         return randomBoolean() ? "head" : "tail";
     }
 
-    protected TimeValue timeout() {
-        return TimeValue.timeValueSeconds(10);
+    // strip any qualification from the received index string
+    private String unqualifiedIndexName() {
+        int offset = index.indexOf(':');
+        return offset >= 0 ? index.substring(offset + 1) : index;
     }
 }

+ 7 - 1
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlDateNanosSpecTestCase.java

@@ -21,8 +21,14 @@ public abstract class EqlDateNanosSpecTestCase extends BaseEqlSpecTestCase {
         return asArray(EqlSpecLoader.load("/test_queries_date_nanos.toml", new HashSet<>()));
     }
 
+    // constructor for "local" rest tests
     public EqlDateNanosSpecTestCase(String query, String name, long[] eventIds) {
-        super(DATE_NANOS_INDEX, query, name, eventIds);
+        this(DATE_NANOS_INDEX, query, name, eventIds);
+    }
+
+    // constructor for multi-cluster tests
+    public EqlDateNanosSpecTestCase(String index, String query, String name, long[] eventIds) {
+        super(index, query, name, eventIds);
     }
 
     @Override

+ 7 - 1
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlExtraSpecTestCase.java

@@ -21,8 +21,14 @@ public abstract class EqlExtraSpecTestCase extends BaseEqlSpecTestCase {
         return asArray(EqlSpecLoader.load("/test_extra.toml", new HashSet<>()));
     }
 
+    // constructor for "local" rest tests
     public EqlExtraSpecTestCase(String query, String name, long[] eventIds) {
-        super(TEST_EXTRA_INDEX, query, name, eventIds);
+        this(TEST_EXTRA_INDEX, query, name, eventIds);
+    }
+
+    // constructor for multi-cluster tests
+    public EqlExtraSpecTestCase(String index, String query, String name, long[] eventIds) {
+        super(index, query, name, eventIds);
     }
 
     @Override

+ 13 - 11
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlRestTestCase.java

@@ -10,10 +10,8 @@ import org.apache.http.util.EntityUtils;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
-import org.elasticsearch.test.rest.ESRestTestCase;
 import org.junit.After;
 
 import java.io.IOException;
@@ -27,7 +25,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 
-public abstract class EqlRestTestCase extends ESRestTestCase {
+public abstract class EqlRestTestCase extends RemoteClusterAwareEqlRestTestCase {
 
     private static final String defaultValidationIndexName = "eql_search_validation_test";
     private static final String validQuery = "process where user = \\\"SYSTEM\\\"";
@@ -49,11 +47,11 @@ public abstract class EqlRestTestCase extends ESRestTestCase {
     };
 
     public void testBadRequests() throws Exception {
-        createIndex(defaultValidationIndexName, Settings.EMPTY);
+        createIndex(defaultValidationIndexName, (String) null);
 
         final String contentType = "application/json";
         for (String[] test : testBadRequests) {
-            final String endpoint = "/" + defaultValidationIndexName + "/_eql/search";
+            final String endpoint = "/" + indexPattern(defaultValidationIndexName) + "/_eql/search";
             Request request = new Request("GET", endpoint);
             request.setJsonEntity(test[0]);
 
@@ -70,8 +68,8 @@ public abstract class EqlRestTestCase extends ESRestTestCase {
 
     @SuppressWarnings("unchecked")
     public void testIndexWildcardPatterns() throws Exception {
-        createIndex("test1", Settings.EMPTY, null, "\"my_alias\" : {}, \"test_alias\" : {}");
-        createIndex("test2", Settings.EMPTY, null, "\"my_alias\" : {}");
+        createIndex("test1", "\"my_alias\" : {}, \"test_alias\" : {}");
+        createIndex("test2", "\"my_alias\" : {}");
 
         StringBuilder bulk = new StringBuilder();
         bulk.append("{\"index\": {\"_index\": \"test1\", \"_id\": 1}}\n");
@@ -86,7 +84,7 @@ public abstract class EqlRestTestCase extends ESRestTestCase {
         };
 
         for (String indexPattern : wildcardRequests) {
-            String endpoint = "/" + indexPattern + "/_eql/search";
+            String endpoint = "/" + indexPattern(indexPattern) + "/_eql/search";
             Request request = new Request("GET", endpoint);
             request.setJsonEntity("{\"query\":\"process where true\"}");
             Response response = client().performRequest(request);
@@ -108,7 +106,7 @@ public abstract class EqlRestTestCase extends ESRestTestCase {
 
     @SuppressWarnings("unchecked")
     public void testUnicodeChars() throws Exception {
-        createIndex("test", Settings.EMPTY, null, null);
+        createIndex("test", (String) null);
 
         StringBuilder bulk = new StringBuilder();
         bulk.append("{\"index\": {\"_index\": \"test\", \"_id\": 1}}\n");
@@ -117,7 +115,7 @@ public abstract class EqlRestTestCase extends ESRestTestCase {
         bulk.append("{\"event\":{\"category\":\"process\"},\"@timestamp\":\"2020-09-05T12:34:57Z\",\"log\" : \"prefix_𖠋_suffix\"}\n");
         bulkIndex(bulk.toString());
 
-        String endpoint = "/test/_eql/search";
+        String endpoint = "/" + indexPattern("test") + "/_eql/search";
         Request request = new Request("GET", endpoint);
         request.setJsonEntity("{\"query\":\"process where log==\\\"prefix_\\\\u{0eb}_suffix\\\"\"}");
         Response response = client().performRequest(request);
@@ -150,9 +148,13 @@ public abstract class EqlRestTestCase extends ESRestTestCase {
         bulkRequest.setJsonEntity(bulk);
         bulkRequest.addParameter("refresh", "true");
 
-        Response response = client().performRequest(bulkRequest);
+        Response response = provisioningClient().performRequest(bulkRequest);
         assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
         String bulkResponse = EntityUtils.toString(response.getEntity());
         assertThat(bulkResponse, not(containsString("\"errors\": true")));
     }
+
+    protected String indexPattern(String pattern) {
+        return pattern;
+    }
 }

+ 11 - 9
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlRestValidationTestCase.java

@@ -11,10 +11,8 @@ import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
-import org.elasticsearch.test.rest.ESRestTestCase;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -24,7 +22,7 @@ import static org.elasticsearch.xpack.ql.util.StringUtils.EMPTY;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
-public abstract class EqlRestValidationTestCase extends ESRestTestCase {
+public abstract class EqlRestValidationTestCase extends RemoteClusterAwareEqlRestTestCase {
 
     private static final String indexName = "test_eql";
     protected static final String[] existentIndexWithWildcard = new String[] {indexName + ",inexistent*", indexName + "*,inexistent*",
@@ -35,8 +33,8 @@ public abstract class EqlRestValidationTestCase extends ESRestTestCase {
 
     @Before
     public void prepareIndices() throws IOException {
-        if (client().performRequest(new Request("HEAD", "/" + indexName)).getStatusLine().getStatusCode() == 404) {
-            createIndex(indexName, Settings.EMPTY);
+        if (provisioningClient().performRequest(new Request("HEAD", "/" + indexName)).getStatusLine().getStatusCode() == 404) {
+            createIndex(indexName, (String) null);
         }
 
         Object[] fieldsAndValues = new Object[] {"event_type", "my_event", "@timestamp", "2020-10-08T12:35:48Z", "val", 0};
@@ -47,9 +45,9 @@ public abstract class EqlRestValidationTestCase extends ESRestTestCase {
         document.endObject();
         final Request request = new Request("POST", "/" + indexName + "/_doc/" + 0);
         request.setJsonEntity(Strings.toString(document));
-        assertOK(client().performRequest(request));
+        assertOK(provisioningClient().performRequest(request));
 
-        assertOK(adminClient().performRequest(new Request("POST", "/" + indexName + "/_refresh")));
+        assertOK(provisioningAdminClient().performRequest(new Request("POST", "/" + indexName + "/_refresh")));
     }
 
     protected abstract String getInexistentIndexErrorMessage();
@@ -82,7 +80,7 @@ public abstract class EqlRestValidationTestCase extends ESRestTestCase {
 
     protected void assertErrorMessages(String[] indices, String reqParameter, String errorMessage) throws IOException {
         for (String indexName : indices) {
-            assertErrorMessage(indexName, reqParameter, errorMessage + "[" + indexName + "]");
+            assertErrorMessage(indexName, reqParameter, errorMessage + "[" + indexPattern(indexName) + "]");
         }
     }
 
@@ -95,7 +93,7 @@ public abstract class EqlRestValidationTestCase extends ESRestTestCase {
     }
 
     private Request createRequest(String indexName, String reqParameter) throws IOException {
-        final Request request = new Request("POST", "/" + indexName + "/_eql/search" + reqParameter);
+        final Request request = new Request("POST", "/" + indexPattern(indexName) + "/_eql/search" + reqParameter);
         request.setJsonEntity(Strings.toString(JsonXContent.contentBuilder()
             .startObject()
             .field("event_category_field", "event_type")
@@ -112,4 +110,8 @@ public abstract class EqlRestValidationTestCase extends ESRestTestCase {
             assertOK(response);
         }
     }
+
+    protected String indexPattern(String index) {
+        return index;
+    }
 }

+ 7 - 1
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/EqlSpecTestCase.java

@@ -34,7 +34,13 @@ public abstract class EqlSpecTestCase extends BaseEqlSpecTestCase {
         return "serial_event_id";
     }
 
+    // constructor for "local" rest tests
     public EqlSpecTestCase(String query, String name, long[] eventIds) {
-        super(TEST_INDEX, query, name, eventIds);
+        this(TEST_INDEX, query, name, eventIds);
+    }
+
+    // constructor for multi-cluster tests
+    public EqlSpecTestCase(String index, String query, String name, long[] eventIds) {
+        super(index, query, name, eventIds);
     }
 }

+ 133 - 0
x-pack/plugin/eql/qa/common/src/main/java/org/elasticsearch/test/eql/RemoteClusterAwareEqlRestTestCase.java

@@ -0,0 +1,133 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.test.eql;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.core.internal.io.IOUtils;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.elasticsearch.common.Strings.hasText;
+
+public abstract class RemoteClusterAwareEqlRestTestCase extends ESRestTestCase {
+
+    // client used for loading data on a remote cluster only.
+    private static RestClient remoteClient;
+
+    @BeforeClass
+    public static void initRemoteClients() throws IOException {
+        String crossClusterHost = System.getProperty("tests.rest.cluster.remote.host"); // gradle defined
+        if (crossClusterHost != null) {
+            int portSeparator = crossClusterHost.lastIndexOf(':');
+            if (portSeparator < 0) {
+                throw new IllegalArgumentException("Illegal cluster url [" + crossClusterHost + "]");
+            }
+            String host = crossClusterHost.substring(0, portSeparator);
+            int port = Integer.parseInt(crossClusterHost.substring(portSeparator + 1));
+            HttpHost[] remoteHttpHosts = new HttpHost[] { new HttpHost(host, port) };
+
+            remoteClient = clientBuilder(secureRemoteClientSettings(), remoteHttpHosts);
+        }
+    }
+
+    @AfterClass
+    public static void closeRemoteClients() throws IOException {
+        try {
+            IOUtils.close(remoteClient);
+        } finally {
+            remoteClient = null;
+        }
+    }
+
+    protected static RestHighLevelClient highLevelClient(RestClient client) {
+        return new RestHighLevelClient(
+                client,
+                ignore -> {
+                },
+                Collections.emptyList()) {
+        };
+    }
+
+    protected static RestClient clientBuilder(Settings settings, HttpHost[] hosts) throws IOException {
+        RestClientBuilder builder = RestClient.builder(hosts);
+        configureClient(builder, settings);
+
+        int timeout = Math.toIntExact(timeout().millis());
+        builder.setRequestConfigCallback(
+            requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(timeout)
+                .setConnectionRequestTimeout(timeout)
+                .setSocketTimeout(timeout)
+        );
+        builder.setStrictDeprecationMode(true);
+        return builder.build();
+    }
+
+    protected static TimeValue timeout() {
+        return TimeValue.timeValueSeconds(10);
+    }
+
+    // returned client is used to load the test data, either in the local cluster (for rest/javaRestTests) or a remote one (for
+    // multi-cluster). note: the client()/adminClient() will always connect to the local cluster.
+    protected static RestClient provisioningClient() {
+        return remoteClient == null ? client() : remoteClient;
+    }
+
+    protected static RestClient provisioningAdminClient() {
+        return remoteClient == null ? adminClient() : remoteClient;
+    }
+
+    protected static void createIndex(String name, String aliases) throws IOException {
+        Settings settings = Settings.EMPTY;
+        Request request = new Request("PUT", "/" + name);
+        String entity = "{\"settings\": " + Strings.toString(settings);
+        if (aliases != null) {
+            entity += ",\"aliases\": {" + aliases + "}";
+        }
+        entity += "}";
+        if (settings.getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) == false) {
+            expectSoftDeletesWarning(request, name);
+        }
+        request.setJsonEntity(entity);
+        provisioningClient().performRequest(request);
+    }
+
+    protected static void deleteIndex(String name) throws IOException {
+        deleteIndex(provisioningClient(), name);
+    }
+
+    @Override
+    protected Settings restClientSettings() {
+        return secureRemoteClientSettings();
+    }
+
+    protected static Settings secureRemoteClientSettings() {
+        String user = System.getProperty("tests.rest.cluster.remote.user"); // gradle defined
+        String pass = System.getProperty("tests.rest.cluster.remote.password");
+        if (hasText(user) && hasText(pass)) {
+            String token = basicAuthHeaderValue(user, new SecureString(pass.toCharArray()));
+            return Settings.builder()
+                .put(ThreadContext.PREFIX + ".Authorization", token)
+                .build();
+        }
+        return Settings.EMPTY;
+    }
+}

+ 53 - 0
x-pack/plugin/eql/qa/multi-cluster-with-security/build.gradle

@@ -0,0 +1,53 @@
+import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask
+
+apply plugin: 'elasticsearch.internal-testclusters'
+apply plugin: 'elasticsearch.standalone-rest-test'
+apply plugin: 'elasticsearch.rest-test'
+
+dependencies {
+  testImplementation project(path: xpackModule('eql:qa:common'))
+}
+
+testClusters {
+  'remote-cluster' {
+    testDistribution = 'DEFAULT'
+    numberOfNodes = 2
+    setting 'node.roles', '[data,ingest,master]'
+    setting 'xpack.ml.enabled', 'false'
+    setting 'xpack.watcher.enabled', 'false'
+    setting 'xpack.security.enabled', 'true'
+
+    user username: "test_user", password: "x-pack-test-password"
+  }
+
+  'integTest' {
+    testDistribution = 'DEFAULT'
+    setting 'xpack.ml.enabled', 'false'
+    setting 'xpack.watcher.enabled', 'false'
+    setting 'cluster.remote.my_remote_cluster.seeds', {
+      testClusters.'remote-cluster'.getAllTransportPortURI().collect { "\"$it\"" }.toString()
+    }
+    setting 'cluster.remote.connections_per_cluster', "1"
+    setting 'xpack.security.enabled', 'true'
+
+    user username: "test_user", password: "x-pack-test-password"
+  }
+}
+
+tasks.register("startRemoteCluster", DefaultTestClustersTask.class) {
+  useCluster testClusters.'remote-cluster'
+  doLast {
+    "Starting remote cluster before integ tests and integTest cluster is started"
+  }
+}
+
+tasks.named("integTest").configure {
+  dependsOn 'startRemoteCluster'
+  useCluster testClusters.'remote-cluster'
+  doFirst {
+    nonInputProperties.systemProperty 'tests.rest.cluster.remote.host', "${-> testClusters.'remote-cluster'.getAllHttpSocketURI().get(0)}"
+    nonInputProperties.systemProperty 'tests.rest.cluster.remote.user', "test_user"
+    nonInputProperties.systemProperty 'tests.rest.cluster.remote.password', "x-pack-test-password"
+  }
+}
+tasks.named("check").configure {dependsOn("integTest") } // run these tests as part of the "check" task

+ 20 - 0
x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlDateNanosIT.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql;
+
+import org.elasticsearch.test.eql.EqlDateNanosSpecTestCase;
+
+import static org.elasticsearch.test.eql.DataLoader.DATE_NANOS_INDEX;
+import static org.elasticsearch.xpack.eql.RemoteClusterTestUtils.remoteClusterIndex;
+
+public class EqlDateNanosIT extends EqlDateNanosSpecTestCase {
+
+    public EqlDateNanosIT(String query, String name, long[] eventIds) {
+        super(remoteClusterIndex(DATE_NANOS_INDEX), query, name, eventIds);
+    }
+}

+ 20 - 0
x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlExtraIT.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql;
+
+import org.elasticsearch.test.eql.EqlExtraSpecTestCase;
+
+import static org.elasticsearch.test.eql.DataLoader.TEST_EXTRA_INDEX;
+import static org.elasticsearch.xpack.eql.RemoteClusterTestUtils.remoteClusterIndex;
+
+public class EqlExtraIT extends EqlExtraSpecTestCase {
+
+    public EqlExtraIT(String query, String name, long[] eventIds) {
+        super(remoteClusterIndex(TEST_EXTRA_INDEX), query, name, eventIds);
+    }
+}

+ 20 - 0
x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlRestIT.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql;
+
+import org.elasticsearch.test.eql.EqlRestTestCase;
+
+import static org.elasticsearch.xpack.eql.RemoteClusterTestUtils.remoteClusterPattern;
+
+public class EqlRestIT extends EqlRestTestCase {
+
+    @Override
+    protected String indexPattern(String pattern) {
+        return remoteClusterPattern(pattern);
+    }
+}

+ 41 - 0
x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlRestValidationIT.java

@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql;
+
+import org.elasticsearch.test.eql.EqlRestValidationTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.xpack.eql.RemoteClusterTestUtils.remoteClusterPattern;
+
+public class EqlRestValidationIT extends EqlRestValidationTestCase {
+
+    @Override
+    protected String getInexistentIndexErrorMessage() {
+        return "\"caused_by\":{\"type\":\"verification_exception\",\"reason\":\"Found 1 problem\\nline -1:-1: Unknown index ";
+    }
+
+    protected void assertErrorMessageWhenAllowNoIndicesIsFalse(String reqParameter) throws IOException {
+        assertErrorMessage("inexistent1*", reqParameter, "\"root_cause\":[{\"type\":\"index_not_found_exception\","
+            + "\"reason\":\"no such index [inexistent1*]\"");
+        assertErrorMessage("inexistent1*,inexistent2*", reqParameter, "\"root_cause\":[{\"type\":\"index_not_found_exception\","
+            + "\"reason\":\"no such index [inexistent1*]\"");
+        assertErrorMessage("test_eql,inexistent*", reqParameter, "\"root_cause\":[{\"type\":\"index_not_found_exception\","
+            + "\"reason\":\"no such index [inexistent*]\"");
+        //TODO: revisit the next two tests when https://github.com/elastic/elasticsearch/issues/64190 is closed
+        assertErrorMessage("inexistent", reqParameter, "\"root_cause\":[{\"type\":\"index_not_found_exception\","
+            + "\"reason\":\"no such index [[inexistent]]\"");
+        assertErrorMessage("inexistent1,inexistent2", reqParameter, "\"root_cause\":[{\"type\":\"index_not_found_exception\","
+            + "\"reason\":\"no such index [[inexistent1, inexistent2]]\"");
+    }
+
+    @Override
+    protected String indexPattern(String pattern) {
+        return remoteClusterPattern(pattern);
+    }
+}

+ 20 - 0
x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/EqlSpecIT.java

@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql;
+
+import org.elasticsearch.test.eql.EqlSpecTestCase;
+
+import static org.elasticsearch.test.eql.DataLoader.TEST_INDEX;
+import static org.elasticsearch.xpack.eql.RemoteClusterTestUtils.remoteClusterIndex;
+
+public class EqlSpecIT extends EqlSpecTestCase {
+
+    public EqlSpecIT(String query, String name, long[] eventIds) {
+        super(remoteClusterIndex(TEST_INDEX), query, name, eventIds);
+    }
+}

+ 25 - 0
x-pack/plugin/eql/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/eql/RemoteClusterTestUtils.java

@@ -0,0 +1,25 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql;
+
+public class RemoteClusterTestUtils {
+    public static final String REMOTE_CLUSTER_NAME = "my_remote_cluster"; // gradle defined
+
+    public static String remoteClusterIndex(String indexName) {
+        return REMOTE_CLUSTER_NAME + ":" + indexName;
+    }
+
+    public static String remoteClusterPattern(String pattern) {
+        StringBuilder sb = new StringBuilder();
+        for (String index: pattern.split(",")) {
+            sb.append(remoteClusterIndex(index));
+            sb.append(',');
+        }
+        return sb.substring(0, sb.length() - 1);
+    }
+}

+ 7 - 0
x-pack/plugin/eql/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/eql/EqlRestValidationIT.java

@@ -1,3 +1,10 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
 package org.elasticsearch.xpack.eql;
 
 import org.elasticsearch.test.eql.EqlRestValidationTestCase;

+ 1 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java

@@ -55,7 +55,7 @@ public class Analyzer extends RuleExecutor<LogicalPlan> {
     }
 
     private LogicalPlan verify(LogicalPlan plan) {
-        Collection<Failure> failures = verifier.verify(plan);
+        Collection<Failure> failures = verifier.verify(plan, configuration.versionIncompatibleClusters());
         if (failures.isEmpty() == false) {
             throw new VerificationException(failures);
         }

+ 18 - 9
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.eql.analysis;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.xpack.eql.plan.logical.Head;
 import org.elasticsearch.xpack.eql.plan.logical.Join;
 import org.elasticsearch.xpack.eql.plan.logical.KeyedFilter;
@@ -19,8 +20,8 @@ import org.elasticsearch.xpack.ql.common.Failure;
 import org.elasticsearch.xpack.ql.expression.Attribute;
 import org.elasticsearch.xpack.ql.expression.NamedExpression;
 import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
+import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
 import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
-import org.elasticsearch.xpack.ql.tree.Node;
 import org.elasticsearch.xpack.ql.type.DataTypes;
 import org.elasticsearch.xpack.ql.util.StringUtils;
 
@@ -29,10 +30,9 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 
-import static java.util.stream.Collectors.toMap;
 import static org.elasticsearch.xpack.eql.stats.FeatureMetric.EVENT;
 import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN;
 import static org.elasticsearch.xpack.eql.stats.FeatureMetric.JOIN_KEYS_FIVE_OR_MORE;
@@ -69,12 +69,7 @@ public class Verifier {
         this.metrics = metrics;
     }
 
-    public Map<Node<?>, String> verifyFailures(LogicalPlan plan) {
-        Collection<Failure> failures = verify(plan);
-        return failures.stream().collect(toMap(Failure::node, Failure::message));
-    }
-
-    Collection<Failure> verify(LogicalPlan plan) {
+    Collection<Failure> verify(LogicalPlan plan, Function<String, Collection<String>> versionIncompatibleClusters) {
         Set<Failure> failures = new LinkedHashSet<>();
 
         // start bottom-up
@@ -157,6 +152,7 @@ public class Verifier {
 
                 checkFilterConditionType(p, localFailures);
                 checkJoinKeyTypes(p, localFailures);
+                checkRemoteClusterOnSameVersion(p, versionIncompatibleClusters, localFailures);
                 // mark the plan as analyzed
                 // if everything checks out
                 if (failures.isEmpty()) {
@@ -278,4 +274,17 @@ public class Verifier {
             ));
         }
     }
+
+    private void checkRemoteClusterOnSameVersion(LogicalPlan plan, Function<String, Collection<String>> versionIncompatibleClusters,
+                                                 Collection<Failure> localFailures) {
+        if (plan instanceof EsRelation) {
+            EsRelation esRelation = (EsRelation) plan;
+            Collection<String> incompatibleClusters = versionIncompatibleClusters.apply(esRelation.index().name());
+            if (incompatibleClusters.size() > 0) {
+                localFailures.add(fail(esRelation, "the following remote cluster{} incompatible, being on a version different than local "
+                    + "cluster's [{}]: {}", incompatibleClusters.size() > 1 ? "s are" : " is", Version.CURRENT,
+                    incompatibleClusters));
+            }
+        }
+    }
 }

+ 3 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/payload/EventPayload.java

@@ -15,6 +15,8 @@ import org.elasticsearch.xpack.eql.execution.search.RuntimeUtils;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;
+
 public class EventPayload extends AbstractPayload {
 
     private final List<Event> values;
@@ -25,7 +27,7 @@ public class EventPayload extends AbstractPayload {
         List<SearchHit> hits = RuntimeUtils.searchHits(response);
         values = new ArrayList<>(hits.size());
         for (SearchHit hit : hits) {
-            values.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceRef(), hit.getFields()));
+            values.add(new Event(qualifiedIndex(hit), hit.getId(), hit.getSourceRef(), hit.getFields()));
         }
     }
 

+ 3 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/search/HitReference.java

@@ -11,6 +11,8 @@ import org.elasticsearch.search.SearchHit;
 
 import java.util.Objects;
 
+import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;
+
 public class HitReference {
 
     private final String index;
@@ -22,7 +24,7 @@ public class HitReference {
     }
 
     public HitReference(SearchHit hit) {
-        this(hit.getIndex(), hit.getId());
+        this(qualifiedIndex(hit), hit.getId());
     }
 
     public String index() {

+ 3 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/SequencePayload.java

@@ -15,6 +15,8 @@ import org.elasticsearch.xpack.eql.execution.payload.AbstractPayload;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;
+
 class SequencePayload extends AbstractPayload {
 
     private final List<org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence> values;
@@ -28,7 +30,7 @@ class SequencePayload extends AbstractPayload {
             List<SearchHit> hits = docs.get(i);
             List<Event> events = new ArrayList<>(hits.size());
             for (SearchHit hit : hits) {
-                events.add(new Event(hit.getIndex(), hit.getId(), hit.getSourceRef(), hit.getFields()));
+                events.add(new Event(qualifiedIndex(hit), hit.getId(), hit.getSourceRef(), hit.getFields()));
             }
             values.add(new org.elasticsearch.xpack.eql.action.EqlSearchResponse.Sequence(s.key().asList(), events));
         }

+ 2 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/sequence/TumblingWindow.java

@@ -35,6 +35,7 @@ import java.util.Set;
 import static java.util.stream.Collectors.toList;
 import static org.elasticsearch.action.ActionListener.wrap;
 import static org.elasticsearch.xpack.eql.execution.search.RuntimeUtils.searchHits;
+import static org.elasticsearch.xpack.eql.util.SearchHitUtils.qualifiedIndex;
 
 /**
  * Time-based window encapsulating query creation and advancement.
@@ -629,7 +630,7 @@ public class TumblingWindow implements Executable {
                     SearchHit hit = delegate.next();
                     SequenceKey k = key(criterion.key(hit));
                     Ordinal o = criterion.ordinal(hit);
-                    return new Tuple<>(new KeyAndOrdinal(k, o), new HitReference(cache(hit.getIndex()), hit.getId()));
+                    return new Tuple<>(new KeyAndOrdinal(k, o), new HitReference(cache(qualifiedIndex(hit)), hit.getId()));
                 }
             };
         };

+ 1 - 2
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java

@@ -65,8 +65,7 @@ public class EqlPlugin extends Plugin implements ActionPlugin {
         return createComponents(client, clusterService.getClusterName().value(), namedWriteableRegistry);
     }
 
-    private Collection<Object> createComponents(Client client, String clusterName,
-                                                NamedWriteableRegistry namedWriteableRegistry) {
+    private Collection<Object> createComponents(Client client, String clusterName, NamedWriteableRegistry namedWriteableRegistry) {
         IndexResolver indexResolver = new IndexResolver(client, clusterName, DefaultDataTypeRegistry.INSTANCE);
         PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, namedWriteableRegistry);
         return Arrays.asList(planExecutor);

+ 4 - 1
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java

@@ -39,6 +39,7 @@ import org.elasticsearch.xpack.eql.execution.PlanExecutor;
 import org.elasticsearch.xpack.eql.parser.ParserParams;
 import org.elasticsearch.xpack.eql.session.EqlConfiguration;
 import org.elasticsearch.xpack.eql.session.Results;
+import org.elasticsearch.xpack.eql.util.RemoteClusterRegistry;
 import org.elasticsearch.xpack.ql.async.AsyncTaskManagementService;
 import org.elasticsearch.xpack.ql.expression.Order;
 
@@ -132,9 +133,11 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRe
             .size(request.size())
             .fetchSize(request.fetchSize());
 
+        RemoteClusterRegistry remoteClusterRegistry = new RemoteClusterRegistry(transportService.getRemoteClusterService(),
+            request.indicesOptions());
         EqlConfiguration cfg = new EqlConfiguration(request.indices(), zoneId, username, clusterName, filter,
                 request.runtimeMappings(), fetchFields, timeout, request.indicesOptions(), request.fetchSize(),
-                clientId, new TaskId(nodeId, task.getId()), task);
+                clientId, new TaskId(nodeId, task.getId()), task, remoteClusterRegistry::versionIncompatibleClusters);
         executeRequestWithRetryAttempt(clusterService, listener::onFailure,
             onFailure -> planExecutor.eql(cfg, request.query(), params,
                 wrap(r -> listener.onResponse(createResponse(r, task.getExecutionId())), onFailure)),

+ 6 - 3
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlConfiguration.java

@@ -8,8 +8,8 @@
 package org.elasticsearch.xpack.eql.session;
 
 import org.elasticsearch.action.support.IndicesOptions;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.search.fetch.subphase.FieldAndFormat;
@@ -17,8 +17,10 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xpack.eql.action.EqlSearchTask;
 
 import java.time.ZoneId;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configuration {
 
@@ -39,8 +41,9 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu
 
     public EqlConfiguration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter,
                             Map<String, Object> runtimeMappings, List<FieldAndFormat> fetchFields, TimeValue requestTimeout,
-                            IndicesOptions indicesOptions, int fetchSize, String clientId, TaskId taskId, EqlSearchTask task) {
-        super(zi, username, clusterName);
+                            IndicesOptions indicesOptions, int fetchSize, String clientId, TaskId taskId, EqlSearchTask task,
+                            Function<String, Collection<String>> versionIncompatibleClusters) {
+        super(zi, username, clusterName, versionIncompatibleClusters);
 
         this.indices = indices;
         this.filter = filter;

+ 48 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/util/RemoteClusterRegistry.java

@@ -0,0 +1,48 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql.util;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.OriginalIndices;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.transport.RemoteClusterService;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class RemoteClusterRegistry {
+
+    private final RemoteClusterService remoteClusterService;
+    private final IndicesOptions indicesOptions;
+
+    public RemoteClusterRegistry(RemoteClusterService remoteClusterService, IndicesOptions indicesOptions) {
+        this.remoteClusterService = remoteClusterService;
+        this.indicesOptions = indicesOptions;
+    }
+
+    public Set<String> versionIncompatibleClusters(String indexPattern) {
+        Set<String> incompatibleClusters = new TreeSet<>();
+        for (String clusterAlias: indicesPerRemoteCluster(indexPattern).keySet()) {
+            Version clusterVersion = remoteClusterService.getConnection(clusterAlias).getVersion();
+            if (clusterVersion.equals(Version.CURRENT) == false) { // TODO: should newer clusters be eventually allowed?
+                incompatibleClusters.add(clusterAlias);
+            }
+        }
+        return incompatibleClusters;
+    }
+
+    private Map<String, OriginalIndices> indicesPerRemoteCluster(String indexPattern) {
+        Map<String, OriginalIndices> indicesMap = remoteClusterService.groupIndices(indicesOptions,
+            Strings.splitStringByCommaToArray(indexPattern));
+        indicesMap.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
+        return indicesMap;
+    }
+}

+ 19 - 0
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/util/SearchHitUtils.java

@@ -0,0 +1,19 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.eql.util;
+
+import org.elasticsearch.search.SearchHit;
+
+import static org.elasticsearch.transport.RemoteClusterAware.buildRemoteIndexName;
+
+public final class SearchHitUtils {
+
+    public static String qualifiedIndex(SearchHit hit) {
+        return buildRemoteIndexName(hit.getClusterAlias(), hit.getIndex());
+    }
+}

+ 5 - 2
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/EqlTestUtils.java

@@ -23,6 +23,8 @@ import org.elasticsearch.xpack.eql.expression.predicate.operator.comparison.Inse
 import org.elasticsearch.xpack.eql.session.EqlConfiguration;
 import org.elasticsearch.xpack.ql.expression.Expression;
 
+import java.util.Collections;
+
 import static java.util.Collections.emptyMap;
 import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
 import static org.elasticsearch.test.ESTestCase.randomBoolean;
@@ -41,7 +43,7 @@ public final class EqlTestUtils {
 
     public static final EqlConfiguration TEST_CFG = new EqlConfiguration(new String[] {"none"},
             org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, emptyMap(), null,
-            TimeValue.timeValueSeconds(30), null, 123, "", new TaskId("test", 123), null);
+            TimeValue.timeValueSeconds(30), null, 123, "", new TaskId("test", 123), null, x -> Collections.emptySet());
 
     public static EqlConfiguration randomConfiguration() {
         return new EqlConfiguration(new String[]{randomAlphaOfLength(16)},
@@ -56,7 +58,8 @@ public final class EqlTestUtils {
             randomIntBetween(1, 1000),
             randomAlphaOfLength(16),
             new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()),
-            randomTask());
+            randomTask(),
+            x -> Collections.emptySet());
     }
 
     public static EqlSearchTask randomTask() {

+ 33 - 6
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/CancellationTests.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.eql.analysis;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.fieldcaps.FieldCapabilities;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
@@ -18,9 +19,13 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
 import org.elasticsearch.xpack.eql.action.EqlSearchResponse;
@@ -29,6 +34,8 @@ import org.elasticsearch.xpack.eql.execution.PlanExecutor;
 import org.elasticsearch.xpack.eql.plugin.TransportEqlSearchAction;
 import org.elasticsearch.xpack.ql.index.IndexResolver;
 import org.elasticsearch.xpack.ql.type.DefaultDataTypeRegistry;
+import org.junit.After;
+import org.junit.Before;
 import org.mockito.ArgumentCaptor;
 import org.mockito.stubbing.Answer;
 
@@ -36,6 +43,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static java.util.Collections.emptyMap;
@@ -51,17 +59,32 @@ import static org.mockito.Mockito.when;
 
 public class CancellationTests extends ESTestCase {
 
+    private ThreadPool threadPool;
+    private TransportService transportService;
+
+    @Before
+    public void mockTransportService() {
+        threadPool = new TestThreadPool(getClass().getName());
+        // The TransportService needs to be able to return a valid RemoteClusterServices object down the stream, required by the Verifier.
+        transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool);
+    }
+
+    @After
+    public void cleanupTransportService() {
+        ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
+    }
+
     public void testCancellationBeforeFieldCaps() throws InterruptedException {
         Client client = mock(Client.class);
         EqlSearchTask task = mock(EqlSearchTask.class);
         when(task.isCancelled()).thenReturn(true);
         ClusterService mockClusterService = mockClusterService();
 
-        IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+        IndexResolver indexResolver = indexResolver(client);
         PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
         CountDownLatch countDownLatch = new CountDownLatch(1);
         TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().query("foo where blah"), "",
-            mock(TransportService.class), mockClusterService, new ActionListener<>() {
+            transportService, mockClusterService, new ActionListener<>() {
                 @Override
                 public void onResponse(EqlSearchResponse eqlSearchResponse) {
                     fail("Shouldn't be here");
@@ -120,11 +143,11 @@ public class CancellationTests extends ESTestCase {
         }).when(client).fieldCaps(any(), any());
 
 
-        IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+        IndexResolver indexResolver = indexResolver(client);
         PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
         CountDownLatch countDownLatch = new CountDownLatch(1);
         TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().indices("endgame")
-            .query("process where foo==3"), "", mock(TransportService.class), mockClusterService, new ActionListener<>() {
+            .query("process where foo==3"), "", transportService, mockClusterService, new ActionListener<>() {
             @Override
             public void onResponse(EqlSearchResponse eqlSearchResponse) {
                 fail("Shouldn't be here");
@@ -185,11 +208,11 @@ public class CancellationTests extends ESTestCase {
             return null;
         }).when(client).execute(any(), searchRequestCaptor.capture(), any());
 
-        IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+        IndexResolver indexResolver = indexResolver(client);
         PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
         CountDownLatch countDownLatch = new CountDownLatch(1);
         TransportEqlSearchAction.operation(planExecutor, task, new EqlSearchRequest().indices("endgame")
-            .query("process where foo==3"), "", mock(TransportService.class), mockClusterService, new ActionListener<>() {
+            .query("process where foo==3"), "", transportService, mockClusterService, new ActionListener<>() {
             @Override
             public void onResponse(EqlSearchResponse eqlSearchResponse) {
                 fail("Shouldn't be here");
@@ -227,4 +250,8 @@ public class CancellationTests extends ESTestCase {
         when(mockClusterService.getClusterName()).thenReturn(mockClusterName);
         return mockClusterService;
     }
+
+    private static IndexResolver indexResolver(Client client) {
+        return new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+    }
 }

+ 36 - 0
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/analysis/VerifierTests.java

@@ -6,11 +6,15 @@
  */
 package org.elasticsearch.xpack.eql.analysis;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.eql.EqlTestUtils;
 import org.elasticsearch.xpack.eql.expression.function.EqlFunctionRegistry;
 import org.elasticsearch.xpack.eql.parser.EqlParser;
 import org.elasticsearch.xpack.eql.parser.ParsingException;
+import org.elasticsearch.xpack.eql.session.EqlConfiguration;
 import org.elasticsearch.xpack.eql.stats.Metrics;
 import org.elasticsearch.xpack.ql.index.EsIndex;
 import org.elasticsearch.xpack.ql.index.IndexResolution;
@@ -18,7 +22,14 @@ import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
 import org.elasticsearch.xpack.ql.type.EsField;
 import org.elasticsearch.xpack.ql.type.TypesTests;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Function;
+
+import static java.util.Collections.emptyMap;
 
 public class VerifierTests extends ESTestCase {
 
@@ -370,4 +381,29 @@ public class VerifierTests extends ESTestCase {
                 "[process where true] by opcode"));
     }
 
+    private LogicalPlan analyzeWithVerifierFunction(Function<String, Collection<String>> versionIncompatibleClusters) {
+        PreAnalyzer preAnalyzer = new PreAnalyzer();
+        EqlConfiguration eqlConfiguration = new EqlConfiguration(new String[] {"none"},
+            org.elasticsearch.xpack.ql.util.DateUtils.UTC, "nobody", "cluster", null, emptyMap(), null,
+            TimeValue.timeValueSeconds(30), null, 123, "", new TaskId("test", 123), null, versionIncompatibleClusters);
+        Analyzer analyzer = new Analyzer(eqlConfiguration, new EqlFunctionRegistry(), new Verifier(new Metrics()));
+        IndexResolution resolution = IndexResolution.valid(new EsIndex("irrelevant", loadEqlMapping("mapping-default.json")));
+        return analyzer.analyze(preAnalyzer.preAnalyze(parser.createStatement("any where true"), resolution));
+    }
+
+    public void testRemoteClusterVersionCheck() {
+        assertNotNull(analyzeWithVerifierFunction(x -> Collections.emptySet()));
+
+        Set<String> clusters = new TreeSet<>() {{
+            add("one");
+        }};
+        VerificationException e = expectThrows(VerificationException.class, () -> analyzeWithVerifierFunction(x -> clusters));
+        assertTrue(e.getMessage().contains("the following remote cluster is incompatible, being on a version different than local "
+            + "cluster's [" + Version.CURRENT + "]: [one]"));
+
+        clusters.add("two");
+        e = expectThrows(VerificationException.class, () -> analyzeWithVerifierFunction(x -> clusters));
+        assertTrue(e.getMessage().contains("the following remote clusters are incompatible, being on a version different than local "
+            + "cluster's [" + Version.CURRENT + "]: [one, two]"));
+    }
 }

+ 1 - 2
x-pack/plugin/eql/src/test/java/org/elasticsearch/xpack/eql/optimizer/TomlFoldTests.java

@@ -39,8 +39,7 @@ public class TomlFoldTests extends ESTestCase {
 
     private static final EqlParser PARSER = new EqlParser();
     private static final EqlFunctionRegistry FUNCTION_REGISTRY = new EqlFunctionRegistry();
-    private static final Verifier VERIFIER = new Verifier(new Metrics());
-    private static final Analyzer ANALYZER = new Analyzer(TEST_CFG, FUNCTION_REGISTRY, VERIFIER);
+    private static final Analyzer ANALYZER = new Analyzer(TEST_CFG, FUNCTION_REGISTRY, new Verifier(new Metrics()));
 
     private final int num;
     private final EqlFoldSpec spec;

+ 1 - 0
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/index/IndexResolver.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ql.index;
 
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
+
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;

+ 9 - 1
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/session/Configuration.java

@@ -11,6 +11,8 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
+import java.util.Collection;
+import java.util.function.Function;
 
 public class Configuration {
 
@@ -18,13 +20,15 @@ public class Configuration {
     private final String username;
     private final ZonedDateTime now;
     private final ZoneId zoneId;
+    private final Function<String, Collection<String>> versionIncompatibleClusters;
 
-    public Configuration(ZoneId zi, String username, String clusterName) {
+    public Configuration(ZoneId zi, String username, String clusterName, Function<String, Collection<String>> versionIncompatibleClusters) {
         this.zoneId = zi.normalized();
         Clock clock = Clock.system(zoneId);
         this.now = ZonedDateTime.now(Clock.tick(clock, Duration.ofNanos(1)));
         this.username = username;
         this.clusterName = clusterName;
+        this.versionIncompatibleClusters = versionIncompatibleClusters;
     }
 
     public ZoneId zoneId() {
@@ -42,4 +46,8 @@ public class Configuration {
     public String username() {
         return username;
     }
+
+    public Function<String, Collection<String>> versionIncompatibleClusters() {
+        return versionIncompatibleClusters;
+    }
 }

+ 4 - 3
x-pack/plugin/ql/test-fixtures/src/main/java/org/elasticsearch/xpack/ql/TestUtils.java

@@ -58,6 +58,7 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -82,7 +83,7 @@ import static org.junit.Assert.assertEquals;
 public final class TestUtils {
 
     public static final ZoneId UTC = ZoneId.of("Z");
-    public static final Configuration TEST_CFG = new Configuration(UTC, null, null);
+    public static final Configuration TEST_CFG = new Configuration(UTC, null, null, x -> Collections.emptySet());
 
     private static final String MATCHER_TYPE_CONTAINS = "CONTAINS";
     private static final String MATCHER_TYPE_REGEX = "REGEX";
@@ -90,11 +91,11 @@ public final class TestUtils {
     private TestUtils() {}
 
     public static Configuration randomConfiguration() {
-        return new Configuration(randomZone(), randomAlphaOfLength(10), randomAlphaOfLength(10));
+        return new Configuration(randomZone(), randomAlphaOfLength(10), randomAlphaOfLength(10), x -> Collections.emptySet());
     }
 
     public static Configuration randomConfiguration(ZoneId zoneId) {
-        return new Configuration(zoneId, randomAlphaOfLength(10), randomAlphaOfLength(10));
+        return new Configuration(zoneId, randomAlphaOfLength(10), randomAlphaOfLength(10), x -> Collections.emptySet());
     }
 
     public static Literal of(Object value) {

+ 2 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/IndicesAndAliasesResolver.java

@@ -302,7 +302,8 @@ class IndicesAndAliasesResolver {
     static boolean allowsRemoteIndices(IndicesRequest request) {
         return request instanceof SearchRequest || request instanceof FieldCapabilitiesRequest
                 || request instanceof GraphExploreRequest || request instanceof ResolveIndexAction.Request
-                || request instanceof OpenPointInTimeRequest;
+                || request instanceof OpenPointInTimeRequest
+                || request.getClass().getCanonicalName().equals("org.elasticsearch.xpack.eql.action.EqlSearchRequest");
     }
 
     private List<String> loadAuthorizedAliases(Set<String> authorizedIndices, Metadata metadata) {

+ 2 - 1
x-pack/plugin/sql/src/main/java/org/elasticsearch/xpack/sql/session/SqlConfiguration.java

@@ -17,6 +17,7 @@ import org.elasticsearch.xpack.sql.proto.Protocol;
 import org.elasticsearch.xpack.sql.proto.SqlVersion;
 
 import java.time.ZoneId;
+import java.util.Collections;
 import java.util.Map;
 
 // Typed object holding properties for a given query
@@ -55,7 +56,7 @@ public class SqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu
                          @Nullable SqlQueryTask task,
                          TimeValue waitForCompletionTimeout, boolean keepOnCompletion, TimeValue keepAlive) {
 
-        super(zi, username, clusterName);
+        super(zi, username, clusterName, x -> Collections.emptySet());
 
         this.pageSize = pageSize;
         this.requestTimeout = requestTimeout;

+ 9 - 5
x-pack/plugin/sql/src/test/java/org/elasticsearch/xpack/sql/analysis/CancellationTests.java

@@ -22,6 +22,8 @@ import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.ql.index.IndexResolver;
+import org.elasticsearch.xpack.ql.type.DefaultDataTypeRegistry;
 import org.elasticsearch.xpack.sql.action.SqlQueryAction;
 import org.elasticsearch.xpack.sql.action.SqlQueryRequest;
 import org.elasticsearch.xpack.sql.action.SqlQueryRequestBuilder;
@@ -29,8 +31,6 @@ import org.elasticsearch.xpack.sql.action.SqlQueryResponse;
 import org.elasticsearch.xpack.sql.action.SqlQueryTask;
 import org.elasticsearch.xpack.sql.execution.PlanExecutor;
 import org.elasticsearch.xpack.sql.plugin.TransportSqlQueryAction;
-import org.elasticsearch.xpack.ql.index.IndexResolver;
-import org.elasticsearch.xpack.ql.type.DefaultDataTypeRegistry;
 import org.mockito.ArgumentCaptor;
 import org.mockito.stubbing.Answer;
 
@@ -59,7 +59,7 @@ public class CancellationTests extends ESTestCase {
         when(task.isCancelled()).thenReturn(true);
         ClusterService mockClusterService = mockClusterService();
 
-        IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+        IndexResolver indexResolver = indexResolver(client);
         PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
         CountDownLatch countDownLatch = new CountDownLatch(1);
         SqlQueryRequest request = new SqlQueryRequestBuilder(client, SqlQueryAction.INSTANCE).query("SELECT foo FROM bar").request();
@@ -122,7 +122,7 @@ public class CancellationTests extends ESTestCase {
         }).when(client).fieldCaps(any(), any());
 
 
-        IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+        IndexResolver indexResolver = indexResolver(client);
         PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
         CountDownLatch countDownLatch = new CountDownLatch(1);
         SqlQueryRequest request = new SqlQueryRequestBuilder(client, SqlQueryAction.INSTANCE)
@@ -188,7 +188,7 @@ public class CancellationTests extends ESTestCase {
             return null;
         }).when(client).execute(any(), searchRequestCaptor.capture(), any());
 
-        IndexResolver indexResolver = new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+        IndexResolver indexResolver = indexResolver(client);
         PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, new NamedWriteableRegistry(Collections.emptyList()));
         SqlQueryRequest request = new SqlQueryRequestBuilder(client, SqlQueryAction.INSTANCE)
             .query("SELECT foo FROM " + indices[0]).request();
@@ -231,4 +231,8 @@ public class CancellationTests extends ESTestCase {
         when(mockClusterService.getClusterName()).thenReturn(mockClusterName);
         return mockClusterService;
     }
+
+    private static IndexResolver indexResolver(Client client) {
+        return new IndexResolver(client, randomAlphaOfLength(10), DefaultDataTypeRegistry.INSTANCE);
+    }
 }