Pārlūkot izejas kodu

[8.x] [Failure Store] Prevent usage of :: selectors with cross-cluster expressions (#125252) (#125831)

* [Failure Store] Prevent usage of :: selectors with cross-cluster expressions (#125252)

The CCS is currently not supported for failure store backing indices.
This PR adjusts the selector parsing (introduced in #118614) to prevent
using `::failures` and `::data` selectors with cross-cluster expressions.

For example, `GET my_remote_cluster:logs-*::failures/_search` request
will fail early, during expression parsing.
To test manually, run `./gradlew run-ccs` and execute the example request.

(cherry picked from commit 1d6c6a59c86de232c65a29579a12e0fdb0eb97ad)

# Conflicts:
#	server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java
#	x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java

* backport also test assertion fix
Slobodan Adamović 7 mēneši atpakaļ
vecāks
revīzija
fd7a4b4739

+ 21 - 3
server/src/main/java/org/elasticsearch/cluster/metadata/IndexNameExpressionResolver.java

@@ -2127,9 +2127,10 @@ public class IndexNameExpressionResolver {
             int lastDoubleColon = expression.lastIndexOf(SELECTOR_SEPARATOR);
             if (lastDoubleColon >= 0) {
                 String suffix = expression.substring(lastDoubleColon + SELECTOR_SEPARATOR.length());
-                doValidateSelectorString(() -> expression, suffix);
+                IndexComponentSelector selector = resolveAndValidateSelectorString(() -> expression, suffix);
                 String expressionBase = expression.substring(0, lastDoubleColon);
                 ensureNoMoreSelectorSeparators(expressionBase, expression);
+                ensureNotMixingRemoteClusterExpressionWithSelectorSeparator(expressionBase, selector, expression);
                 return bindFunction.apply(expressionBase, suffix);
             }
             // Otherwise accept the default
@@ -2137,10 +2138,10 @@ public class IndexNameExpressionResolver {
         }
 
         public static void validateIndexSelectorString(String indexName, String suffix) {
-            doValidateSelectorString(() -> indexName + SELECTOR_SEPARATOR + suffix, suffix);
+            resolveAndValidateSelectorString(() -> indexName + SELECTOR_SEPARATOR + suffix, suffix);
         }
 
-        private static void doValidateSelectorString(Supplier<String> expression, String suffix) {
+        private static IndexComponentSelector resolveAndValidateSelectorString(Supplier<String> expression, String suffix) {
             IndexComponentSelector selector = IndexComponentSelector.getByKey(suffix);
             if (selector == null) {
                 throw new InvalidIndexNameException(
@@ -2148,6 +2149,7 @@ public class IndexNameExpressionResolver {
                     "invalid usage of :: separator, [" + suffix + "] is not a recognized selector"
                 );
             }
+            return selector;
         }
 
         /**
@@ -2178,6 +2180,22 @@ public class IndexNameExpressionResolver {
                 );
             }
         }
+
+        /**
+         * Checks the expression for remote cluster pattern and throws an exception if it is combined with :: selectors.
+         * @throws InvalidIndexNameException if remote cluster pattern is detected after parsing the selector expression
+         */
+        private static void ensureNotMixingRemoteClusterExpressionWithSelectorSeparator(
+            String expressionWithoutSelector,
+            IndexComponentSelector selector,
+            String originalExpression
+        ) {
+            if (selector != null) {
+                if (RemoteClusterAware.isRemoteIndexName(expressionWithoutSelector)) {
+                    throw new InvalidIndexNameException(originalExpression, "Selectors are not yet supported on remote cluster patterns");
+                }
+            }
+        }
     }
 
     /**

+ 41 - 5
server/src/test/java/org/elasticsearch/cluster/metadata/SelectorResolverTests.java

@@ -18,10 +18,13 @@ import org.elasticsearch.indices.InvalidIndexNameException;
 import org.elasticsearch.indices.SystemIndices;
 import org.elasticsearch.test.ESTestCase;
 
+import java.util.Set;
+
 import static org.elasticsearch.action.support.IndexComponentSelector.DATA;
 import static org.elasticsearch.action.support.IndexComponentSelector.FAILURES;
 import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.Context;
 import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
@@ -73,16 +76,49 @@ public class SelectorResolverTests extends ESTestCase {
         // === Corner Cases
         // Empty index name is not necessarily disallowed, but will be filtered out in the next steps of resolution
         assertThat(resolve(selectorsAllowed, "::data"), equalTo(new ResolvedExpression("", DATA)));
-        // Remote cluster syntax is respected, even if code higher up the call stack is likely to already have handled it already
-        assertThat(resolve(selectorsAllowed, "cluster:index::data"), equalTo(new ResolvedExpression("cluster:index", DATA)));
-        // CCS with an empty index name is not necessarily disallowed, though other code in the resolution logic will likely throw
-        assertThat(resolve(selectorsAllowed, "cluster:::data"), equalTo(new ResolvedExpression("cluster:", DATA)));
-        // Same for empty cluster and index names
+        assertThat(resolve(selectorsAllowed, "::failures"), equalTo(new ResolvedExpression("", FAILURES)));
+        // CCS with an empty index and cluster name is not necessarily disallowed, though other code in the resolution logic will likely
+        // throw
         assertThat(resolve(selectorsAllowed, ":::data"), equalTo(new ResolvedExpression(":", DATA)));
+        assertThat(resolve(selectorsAllowed, ":::failures"), equalTo(new ResolvedExpression(":", FAILURES)));
         // Any more prefix colon characters will trigger the multiple separators error logic
         expectThrows(InvalidIndexNameException.class, () -> resolve(selectorsAllowed, "::::data"));
+        expectThrows(InvalidIndexNameException.class, () -> resolve(selectorsAllowed, "::::failures"));
+        expectThrows(InvalidIndexNameException.class, () -> resolve(selectorsAllowed, ":::::failures"));
         // Suffix case is not supported because there is no component named with the empty string
         expectThrows(InvalidIndexNameException.class, () -> resolve(selectorsAllowed, "index::"));
+
+        // remote cluster syntax is not allowed with :: selectors
+        final Set<String> remoteClusterExpressionsWithSelectors = Set.of(
+            "cluster:index::failures",
+            "cluster-*:index::failures",
+            "cluster-*:index-*::failures",
+            "cluster-*:*::failures",
+            "*:index-*::failures",
+            "*:*::failures",
+            "*:-test*,*::failures",
+            "cluster:::failures",
+            "failures:index::failures",
+            "data:index::failures",
+            "failures:failures::failures",
+            "data:data::failures",
+            "cluster:index::data",
+            "cluster-*:index::data",
+            "cluster-*:index-*::data",
+            "cluster-*:*::data",
+            "*:index-*::data",
+            "*:*::data",
+            "cluster:::data",
+            "failures:index::data",
+            "data:index::data",
+            "failures:failures::data",
+            "data:data::data",
+            "*:-test*,*::data"
+        );
+        for (String expression : remoteClusterExpressionsWithSelectors) {
+            var e = expectThrows(InvalidIndexNameException.class, () -> resolve(selectorsAllowed, expression));
+            assertThat(e.getMessage(), containsString("Selectors are not yet supported on remote cluster patterns"));
+        }
     }
 
     public void testResolveMatchAllToSelectors() {

+ 7 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java

@@ -636,7 +636,13 @@ public class StatementParserTests extends AbstractStatementParserTests {
                 expectDoubleColonErrorWithLineNumber(command, "*:*::failures", parseLineNumber + 3);
 
                 // Too many colons
-                expectInvalidIndexNameErrorWithLineNumber(command, "\"index:::data\"", lineNumber, "index:", "must not contain ':'");
+                expectInvalidIndexNameErrorWithLineNumber(
+                    command,
+                    "\"index:::data\"",
+                    lineNumber,
+                    "index:::data",
+                    "Selectors are not yet supported on remote cluster patterns"
+                );
                 expectInvalidIndexNameErrorWithLineNumber(
                     command,
                     "\"index::::data\"",

+ 177 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityFailureStoreRestIT.java

@@ -0,0 +1,177 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.common.xcontent.support.XContentMapValues;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchResponseUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+abstract class AbstractRemoteClusterSecurityFailureStoreRestIT extends AbstractRemoteClusterSecurityTestCase {
+
+    protected void assertSearchResponseContainsIndices(Response response, String... expectedIndices) throws IOException {
+        assertOK(response);
+        final SearchResponse searchResponse = SearchResponseUtils.parseSearchResponse(responseAsParser(response));
+        try {
+            final List<String> actualIndices = Arrays.stream(searchResponse.getHits().getHits())
+                .map(SearchHit::getIndex)
+                .collect(Collectors.toList());
+            assertThat(actualIndices, containsInAnyOrder(expectedIndices));
+        } finally {
+            searchResponse.decRef();
+        }
+    }
+
+    protected void setupTestDataStreamOnFulfillingCluster() throws IOException {
+        // Create data stream and index some documents
+        final Request createComponentTemplate = new Request("PUT", "/_component_template/component1");
+        createComponentTemplate.setJsonEntity("""
+            {
+                "template": {
+                    "mappings": {
+                        "properties": {
+                            "@timestamp": {
+                                "type": "date"
+                            },
+                            "age": {
+                                "type": "integer"
+                            },
+                            "email": {
+                                "type": "keyword"
+                            },
+                            "name": {
+                                "type": "text"
+                            }
+                        }
+                    },
+                    "data_stream_options": {
+                        "failure_store": {
+                            "enabled": true
+                        }
+                    }
+                }
+            }""");
+        assertOK(performRequestAgainstFulfillingCluster(createComponentTemplate));
+
+        final Request createTemplate = new Request("PUT", "/_index_template/template1");
+        createTemplate.setJsonEntity("""
+            {
+                "index_patterns": ["test*"],
+                "data_stream": {},
+                "priority": 500,
+                "composed_of": ["component1"]
+            }""");
+        assertOK(performRequestAgainstFulfillingCluster(createTemplate));
+
+        final Request createDoc1 = new Request("PUT", "/test1/_doc/1?refresh=true&op_type=create");
+        createDoc1.setJsonEntity("""
+            {
+                "@timestamp": 1,
+                "age" : 1,
+                "name" : "jack",
+                "email" : "jack@example.com"
+            }""");
+        assertOK(performRequestAgainstFulfillingCluster(createDoc1));
+
+        final Request createDoc2 = new Request("PUT", "/test1/_doc/2?refresh=true&op_type=create");
+        createDoc2.setJsonEntity("""
+            {
+                "@timestamp": 2,
+                "age" : "this should be an int",
+                "name" : "jack",
+                "email" : "jack@example.com"
+            }""");
+        assertOK(performRequestAgainstFulfillingCluster(createDoc2));
+        {
+            final Request otherTemplate = new Request("PUT", "/_index_template/other_template");
+            otherTemplate.setJsonEntity("""
+                {
+                    "index_patterns": ["other*"],
+                    "data_stream": {},
+                    "priority": 500,
+                    "composed_of": ["component1"]
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(otherTemplate));
+        }
+        {
+            final Request createOtherDoc3 = new Request("PUT", "/other1/_doc/3?refresh=true&op_type=create");
+            createOtherDoc3.setJsonEntity("""
+                {
+                    "@timestamp": 3,
+                    "age" : 3,
+                    "name" : "jane",
+                    "email" : "jane@example.com"
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(createOtherDoc3));
+        }
+        {
+            final Request createOtherDoc4 = new Request("PUT", "/other1/_doc/4?refresh=true&op_type=create");
+            createOtherDoc4.setJsonEntity("""
+                {
+                    "@timestamp": 4,
+                    "age" : "this should be an int",
+                    "name" : "jane",
+                    "email" : "jane@example.com"
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(createOtherDoc4));
+        }
+    }
+
+    protected Response performRequestWithRemoteSearchUser(final Request request) throws IOException {
+        request.setOptions(
+            RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(REMOTE_SEARCH_USER, PASS))
+        );
+        return client().performRequest(request);
+    }
+
+    protected Response performRequestWithUser(final String user, final Request request) throws IOException {
+        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", headerFromRandomAuthMethod(user, PASS)));
+        return client().performRequest(request);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Tuple<List<String>, List<String>> getDataAndFailureIndices(String dataStreamName) throws IOException {
+        Request dataStream = new Request("GET", "/_data_stream/" + dataStreamName);
+        Response response = performRequestAgainstFulfillingCluster(dataStream);
+        Map<String, Object> dataStreams = entityAsMap(response);
+        List<String> dataIndexNames = (List<String>) XContentMapValues.extractValue("data_streams.indices.index_name", dataStreams);
+        List<String> failureIndexNames = (List<String>) XContentMapValues.extractValue(
+            "data_streams.failure_store.indices.index_name",
+            dataStreams
+        );
+        return new Tuple<>(dataIndexNames, failureIndexNames);
+    }
+
+    protected Tuple<String, String> getSingleDataAndFailureIndices(String dataStreamName) throws IOException {
+        Tuple<List<String>, List<String>> indices = getDataAndFailureIndices(dataStreamName);
+        assertThat(indices.v1().size(), equalTo(1));
+        assertThat(indices.v2().size(), equalTo(1));
+        return new Tuple<>(indices.v1().get(0), indices.v2().get(0));
+    }
+
+    protected static void assertSelectorsNotSupported(ResponseException exception) {
+        assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(403));
+        assertThat(exception.getMessage(), containsString("Selectors are not yet supported on remote cluster patterns"));
+    }
+
+}

+ 564 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRCS1FailureStoreRestIT.java

@@ -0,0 +1,564 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.settings.SecureString;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.FeatureFlag;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+import org.elasticsearch.test.cluster.util.resource.Resource;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.Locale;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RemoteClusterSecurityRCS1FailureStoreRestIT extends AbstractRemoteClusterSecurityFailureStoreRestIT {
+
+    static {
+        fulfillingCluster = ElasticsearchCluster.local()
+            .distribution(DistributionType.DEFAULT)
+            .name("fulfilling-cluster")
+            .nodes(3)
+            .apply(commonClusterConfig)
+            .feature(FeatureFlag.FAILURE_STORE_ENABLED)
+            .rolesFile(Resource.fromClasspath("roles.yml"))
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .distribution(DistributionType.DEFAULT)
+            .name("query-cluster")
+            .apply(commonClusterConfig)
+            .feature(FeatureFlag.FAILURE_STORE_ENABLED)
+            .rolesFile(Resource.fromClasspath("roles.yml"))
+            .build();
+    }
+
+    @ClassRule
+    public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
+
+    private static final String ALL_ACCESS = "all_access";
+    private static final String DATA_ACCESS = "data_access";
+    private static final String FAILURE_STORE_ACCESS = "failure_store_access";
+    private static final String MANAGE_FAILURE_STORE_ACCESS = "manage_failure_store_access";
+    private static final String ONLY_READ_FAILURE_STORE_ACCESS = "only_read_failure_store_access";
+    private static final String BACKING_FAILURE_STORE_INDEX_ACCESS = "backing_failure_store_index_access";
+    private static final String BACKING_DATA_INDEX_ACCESS = "backing_data_index_access";
+
+    public void testRCS1CrossClusterSearch() throws Exception {
+        final boolean rcs1Security = true;
+        final boolean isProxyMode = randomBoolean();
+        final boolean skipUnavailable = false; // we want to get actual failures and not skip and get empty results
+        final boolean ccsMinimizeRoundtrips = randomBoolean();
+
+        configureRemoteCluster(REMOTE_CLUSTER_ALIAS, fulfillingCluster, rcs1Security, isProxyMode, skipUnavailable);
+
+        // fulfilling cluster setup
+        setupRoleAndUserOnFulfillingCluster();
+        setupTestDataStreamOnFulfillingCluster();
+
+        // query cluster setup
+        setupLocalDataOnQueryCluster();
+        setupUserAndRoleOnQueryCluster();
+
+        final Tuple<String, String> backingIndices = getSingleDataAndFailureIndices("test1");
+        final String backingDataIndexName = backingIndices.v1();
+        final String backingFailureIndexName = backingIndices.v2();
+
+        final Tuple<String, String> otherBackingIndices = getSingleDataAndFailureIndices("other1");
+        final String otherBackingDataIndexName = otherBackingIndices.v1();
+        final String otherBackingFailureIndexName = otherBackingIndices.v2();
+
+        testCcsWithDataSelectorNotSupported(ccsMinimizeRoundtrips);
+        testCcsWithFailuresSelectorNotSupported(ccsMinimizeRoundtrips);
+        testCcsWithoutSelectorsSupported(backingDataIndexName, ccsMinimizeRoundtrips);
+        testSearchingUnauthorizedIndices(otherBackingFailureIndexName, otherBackingDataIndexName, ccsMinimizeRoundtrips);
+        testSearchingWithAccessToAllIndices(ccsMinimizeRoundtrips, backingDataIndexName, otherBackingDataIndexName);
+        testBackingFailureIndexAccess(ccsMinimizeRoundtrips, backingFailureIndexName);
+        testBackingDataIndexAccess(ccsMinimizeRoundtrips, backingDataIndexName);
+        testSearchingNonExistingIndices(ccsMinimizeRoundtrips);
+        testResolveRemoteClustersIsUnauthorized();
+    }
+
+    private void testBackingDataIndexAccess(boolean ccsMinimizeRoundtrips, String backingDataIndexName) throws IOException {
+        Request dataIndexSearchRequest = new Request(
+            "GET",
+            String.format(
+                Locale.ROOT,
+                "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
+                backingDataIndexName,
+                ccsMinimizeRoundtrips
+            )
+        );
+        assertSearchResponseContainsIndices(
+            performRequestWithUser(BACKING_DATA_INDEX_ACCESS, dataIndexSearchRequest),
+            backingDataIndexName
+        );
+    }
+
+    private void testSearchingWithAccessToAllIndices(
+        boolean ccsMinimizeRoundtrips,
+        String backingDataIndexName,
+        String otherBackingDataIndexName
+    ) throws IOException {
+        final boolean alsoSearchLocally = randomBoolean();
+        final Request dataSearchRequest = new Request(
+            "GET",
+            String.format(
+                Locale.ROOT,
+                "/%s%s:%s/_search?ccs_minimize_roundtrips=%s",
+                alsoSearchLocally ? "local_index," : "",
+                randomFrom("my_remote_cluster", "*", "my_remote_*"),
+                "*",
+                ccsMinimizeRoundtrips
+            )
+        );
+        final String[] expectedIndices = alsoSearchLocally
+            ? new String[] { "local_index", backingDataIndexName, otherBackingDataIndexName }
+            : new String[] { backingDataIndexName, otherBackingDataIndexName };
+        assertSearchResponseContainsIndices(performRequestWithUser(ALL_ACCESS, dataSearchRequest), expectedIndices);
+    }
+
+    private void testSearchingNonExistingIndices(boolean ccsMinimizeRoundtrips) {
+        // searching non-existing index without permissions should result in 403
+        {
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithUser(
+                    FAILURE_STORE_ACCESS,
+                    new Request(
+                        "GET",
+                        String.format(
+                            Locale.ROOT,
+                            "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
+                            "non-existing-no-privileges",
+                            ccsMinimizeRoundtrips
+                        )
+                    )
+                )
+            );
+            final String action = ccsMinimizeRoundtrips ? "indices:data/read/search" : "indices:admin/search/search_shards";
+            assertActionUnauthorized(exception, FAILURE_STORE_ACCESS, action, "non-existing-no-privileges");
+        }
+        // searching non-existing index with permissions should result in 404
+        {
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithUser(
+                    FAILURE_STORE_ACCESS,
+                    new Request(
+                        "GET",
+                        String.format(
+                            Locale.ROOT,
+                            "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
+                            "non-existing-index",
+                            ccsMinimizeRoundtrips
+                        )
+                    )
+                )
+            );
+            assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(404));
+        }
+    }
+
+    private void testSearchingUnauthorizedIndices(
+        String otherBackingFailureIndexName,
+        String otherBackingDataIndexName,
+        boolean ccsMinimizeRoundtrips
+    ) {
+        // try searching remote index for which user has no access
+        final String indexToSearch = randomFrom("other1", otherBackingFailureIndexName, otherBackingDataIndexName);
+        final ResponseException exception = expectThrows(
+            ResponseException.class,
+            () -> performRequestWithUser(
+                FAILURE_STORE_ACCESS,
+                new Request(
+                    "GET",
+                    String.format(
+                        Locale.ROOT,
+                        "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
+                        indexToSearch,
+                        ccsMinimizeRoundtrips
+                    )
+                )
+            )
+        );
+        final String action = ccsMinimizeRoundtrips ? "indices:data/read/search" : "indices:admin/search/search_shards";
+        assertActionUnauthorized(exception, FAILURE_STORE_ACCESS, action, indexToSearch);
+    }
+
+    private void testBackingFailureIndexAccess(boolean ccsMinimizeRoundtrips, String backingFailureIndexName) throws IOException {
+        // direct access to backing failure index is subject to the user's permissions
+        // it might fail in some cases and work in others
+        Request failureIndexSearchRequest = new Request(
+            "GET",
+            String.format(
+                Locale.ROOT,
+                "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
+                backingFailureIndexName,
+                ccsMinimizeRoundtrips
+            )
+        );
+
+        // user with access to all should be able to search the backing failure index
+        assertSearchResponseContainsIndices(performRequestWithUser(ALL_ACCESS, failureIndexSearchRequest), backingFailureIndexName);
+
+        // user with data only access should not be able to search the backing failure index
+        {
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithUser(DATA_ACCESS, failureIndexSearchRequest)
+            );
+            final String action = ccsMinimizeRoundtrips ? "indices:data/read/search" : "indices:admin/search/search_shards";
+            assertActionUnauthorized(exception, DATA_ACCESS, action, backingFailureIndexName);
+        }
+
+        // for user with access to failure store, it depends on the underlying action that is being sent to the remote cluster
+        if (ccsMinimizeRoundtrips) {
+            // this is a special case where indices:data/read/search will be sent to a remote cluster
+            // and the request to backing failure store index will be authorized based on the datastream
+            // which grants access to backing failure store indices (granted by read_failure_store privilege)
+            // from a security perspective, this is a valid use case and there is no way to prevent this with RCS1 security model
+            // since from the fulfilling cluster perspective this request is no different from any other local search request
+            assertSearchResponseContainsIndices(
+                performRequestWithUser(FAILURE_STORE_ACCESS, failureIndexSearchRequest),
+                backingFailureIndexName
+            );
+        } else {
+            // in this case, the user does not have the necessary permissions to search the backing failure index
+            // the request to failure store backing index is authorized based on the datastream
+            // which does not grant access to the indices:admin/search/search_shards action
+            // this action is granted by read_cross_cluster privilege which is currently
+            // not supporting the failure backing indices (only data backing indices)
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithUser(FAILURE_STORE_ACCESS, failureIndexSearchRequest)
+            );
+            assertActionUnauthorized(exception, FAILURE_STORE_ACCESS, "indices:admin/search/search_shards", backingFailureIndexName);
+        }
+
+        // user with manage failure store access should be able to search the backing failure index
+        assertSearchResponseContainsIndices(
+            performRequestWithUser(MANAGE_FAILURE_STORE_ACCESS, failureIndexSearchRequest),
+            backingFailureIndexName
+        );
+
+        assertSearchResponseContainsIndices(
+            performRequestWithUser(BACKING_FAILURE_STORE_INDEX_ACCESS, failureIndexSearchRequest),
+            backingFailureIndexName
+        );
+
+    }
+
+    public void testCcsWithoutSelectorsSupported(String backingDataIndexName, boolean ccsMinimizeRoundtrips) throws IOException {
+        final String[] users = { FAILURE_STORE_ACCESS, DATA_ACCESS };
+        for (String user : users) {
+            final boolean alsoSearchLocally = randomBoolean();
+            final Request dataSearchRequest = new Request(
+                "GET",
+                String.format(
+                    Locale.ROOT,
+                    "/%s%s:%s/_search?ccs_minimize_roundtrips=%s",
+                    alsoSearchLocally ? "local_index," : "",
+                    randomFrom("my_remote_cluster", "*", "my_remote_*"),
+                    randomFrom("test1", "test*", "*", backingDataIndexName),
+                    ccsMinimizeRoundtrips
+                )
+            );
+            final String[] expectedIndices = alsoSearchLocally
+                ? new String[] { "local_index", backingDataIndexName }
+                : new String[] { backingDataIndexName };
+            assertSearchResponseContainsIndices(performRequestWithUser(user, dataSearchRequest), expectedIndices);
+        }
+    }
+
+    private void testCcsWithDataSelectorNotSupported(boolean ccsMinimizeRoundtrips) throws IOException {
+        final String[] users = { FAILURE_STORE_ACCESS, DATA_ACCESS, ALL_ACCESS };
+        for (String user : users) {
+            // query remote cluster using ::data selector should not succeed
+            final boolean alsoSearchLocally = randomBoolean();
+            final Request dataSearchRequest = new Request(
+                "GET",
+                String.format(
+                    Locale.ROOT,
+                    "/%s:%s/_search?ccs_minimize_roundtrips=%s",
+                    randomFrom("my_remote_cluster", "*", "my_remote_*"),
+                    randomFrom("test1::data", "test*::data", "*::data", "non-existing::data"),
+                    ccsMinimizeRoundtrips
+                )
+            );
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithUser(user, dataSearchRequest)
+            );
+            assertSelectorsNotSupported(exception);
+        }
+    }
+
+    private void testCcsWithFailuresSelectorNotSupported(boolean ccsMinimizeRoundtrips) {
+        final String[] users = {
+            FAILURE_STORE_ACCESS,
+            DATA_ACCESS,
+            ALL_ACCESS,
+            MANAGE_FAILURE_STORE_ACCESS,
+            BACKING_DATA_INDEX_ACCESS,
+            BACKING_FAILURE_STORE_INDEX_ACCESS,
+            ONLY_READ_FAILURE_STORE_ACCESS };
+        for (String user : users) {
+            // query remote cluster using ::failures selector should fail (regardless of the user's permissions)
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithUser(
+                    user,
+                    new Request(
+                        "GET",
+                        String.format(
+                            Locale.ROOT,
+                            "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s&ignore_unavailable=true",
+                            randomFrom("test1::failures", "test*::failures", "*::failures", "other1::failures", "non-existing::failures"),
+                            ccsMinimizeRoundtrips
+                        )
+                    )
+                )
+            );
+            assertSelectorsNotSupported(exception);
+        }
+    }
+
+    private void testResolveRemoteClustersIsUnauthorized() {
+        // user with only read_failure_store access should not be able to resolve remote clusters
+        var exc = expectThrows(
+            ResponseException.class,
+            () -> performRequestWithUser(ONLY_READ_FAILURE_STORE_ACCESS, new Request("GET", "/_resolve/cluster/" + REMOTE_CLUSTER_ALIAS))
+        );
+        assertThat(exc.getResponse().getStatusLine().getStatusCode(), equalTo(403));
+        assertThat(
+            exc.getMessage(),
+            containsString(
+                "action ["
+                    + "indices:admin/resolve/cluster"
+                    + "] is unauthorized for user ["
+                    + ONLY_READ_FAILURE_STORE_ACCESS
+                    + "] "
+                    + "with effective roles ["
+                    + ONLY_READ_FAILURE_STORE_ACCESS
+                    + "]"
+            )
+        );
+    }
+
+    private static void setupLocalDataOnQueryCluster() throws IOException {
+        // Index some documents, to use them in a mixed-cluster search
+        final var indexDocRequest = new Request("POST", "/local_index/_doc?refresh=true");
+        indexDocRequest.setJsonEntity("{\"local_foo\": \"local_bar\"}");
+        assertOK(client().performRequest(indexDocRequest));
+    }
+
+    private static void setupUserAndRoleOnQueryCluster() throws IOException {
+        createRole(adminClient(), ALL_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["*"],
+                  "privileges": ["all"]
+                }
+              ]
+            }""");
+        createUser(adminClient(), ALL_ACCESS, PASS, ALL_ACCESS);
+        // the role must simply exist on query cluster, the access is irrelevant,
+        // but we here grant the access to local_index only to test mixed search
+        createRole(adminClient(), FAILURE_STORE_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["local_index"],
+                  "privileges": ["read"]
+                }
+              ]
+            }""");
+        createUser(adminClient(), FAILURE_STORE_ACCESS, PASS, FAILURE_STORE_ACCESS);
+        createRole(adminClient(), DATA_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["local_index"],
+                  "privileges": ["read"]
+                }
+              ]
+            }""");
+        createUser(adminClient(), DATA_ACCESS, PASS, DATA_ACCESS);
+        createRole(adminClient(), MANAGE_FAILURE_STORE_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["local_index"],
+                  "privileges": ["read"]
+                }
+              ]
+            }""");
+        createUser(adminClient(), MANAGE_FAILURE_STORE_ACCESS, PASS, MANAGE_FAILURE_STORE_ACCESS);
+
+        createRole(adminClient(), ONLY_READ_FAILURE_STORE_ACCESS, """
+            {
+            }""");
+        createUser(adminClient(), ONLY_READ_FAILURE_STORE_ACCESS, PASS, ONLY_READ_FAILURE_STORE_ACCESS);
+        createRole(adminClient(), BACKING_FAILURE_STORE_INDEX_ACCESS, """
+            {
+            }""");
+        createUser(adminClient(), BACKING_FAILURE_STORE_INDEX_ACCESS, PASS, BACKING_FAILURE_STORE_INDEX_ACCESS);
+        createRole(adminClient(), BACKING_DATA_INDEX_ACCESS, """
+            {
+            }""");
+        createUser(adminClient(), BACKING_DATA_INDEX_ACCESS, PASS, BACKING_DATA_INDEX_ACCESS);
+
+    }
+
+    private static void createRole(RestClient client, String role, String roleDescriptor) throws IOException {
+        final Request putRoleRequest = new Request("PUT", "/_security/role/" + role);
+        putRoleRequest.setJsonEntity(roleDescriptor);
+        assertOK(client.performRequest(putRoleRequest));
+    }
+
+    private static void createUser(RestClient client, String user, SecureString password, String role) throws IOException {
+        final Request putUserRequest = new Request("PUT", "/_security/user/" + user);
+        putUserRequest.setJsonEntity(Strings.format("""
+            {
+              "password": "%s",
+              "roles" : ["%s"]
+            }""", password.toString(), role));
+        assertOK(client.performRequest(putUserRequest));
+    }
+
+    private static void setupRoleAndUserOnFulfillingCluster() throws IOException {
+        putRoleOnFulfillingCluster(DATA_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["test*"],
+                  "privileges": ["read", "read_cross_cluster"]
+                }
+              ]
+            }""");
+        putUserOnFulfillingCluster(DATA_ACCESS, DATA_ACCESS);
+
+        putRoleOnFulfillingCluster(FAILURE_STORE_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["test*", "non-existing-index"],
+                  "privileges": ["read", "read_cross_cluster", "read_failure_store"]
+                }
+              ]
+            }""");
+        putUserOnFulfillingCluster(FAILURE_STORE_ACCESS, FAILURE_STORE_ACCESS);
+
+        putRoleOnFulfillingCluster(MANAGE_FAILURE_STORE_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["test*", "non-existing-index"],
+                  "privileges": ["manage_failure_store", "read_cross_cluster", "read_failure_store"]
+                }
+              ]
+            }""");
+        putUserOnFulfillingCluster(MANAGE_FAILURE_STORE_ACCESS, MANAGE_FAILURE_STORE_ACCESS);
+
+        putRoleOnFulfillingCluster(ALL_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["*"],
+                  "privileges": ["all"]
+                }
+              ]
+            }""");
+        putUserOnFulfillingCluster(ALL_ACCESS, ALL_ACCESS);
+
+        putRoleOnFulfillingCluster(ONLY_READ_FAILURE_STORE_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": ["test*", "non-existing-index"],
+                  "privileges": ["read_failure_store"]
+                }
+              ]
+            }""");
+        putUserOnFulfillingCluster(ONLY_READ_FAILURE_STORE_ACCESS, ONLY_READ_FAILURE_STORE_ACCESS);
+
+        putRoleOnFulfillingCluster(BACKING_DATA_INDEX_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": [".ds-test*"],
+                  "privileges": ["read", "read_cross_cluster"]
+                }
+              ]
+            }""");
+        putUserOnFulfillingCluster(BACKING_DATA_INDEX_ACCESS, BACKING_DATA_INDEX_ACCESS);
+
+        putRoleOnFulfillingCluster(BACKING_FAILURE_STORE_INDEX_ACCESS, """
+            {
+              "indices": [
+                {
+                  "names": [".fs-test*"],
+                  "privileges": ["read", "read_cross_cluster"]
+                }
+              ]
+            }""");
+        putUserOnFulfillingCluster(BACKING_FAILURE_STORE_INDEX_ACCESS, BACKING_FAILURE_STORE_INDEX_ACCESS);
+    }
+
+    private static void putRoleOnFulfillingCluster(String roleName, String roleDescriptor) throws IOException {
+        Request request = new Request("PUT", "/_security/role/" + roleName);
+        request.setJsonEntity(roleDescriptor);
+        assertOK(performRequestAgainstFulfillingCluster(request));
+    }
+
+    private static void putUserOnFulfillingCluster(String user, String role) throws IOException {
+        Request request = new Request("PUT", "/_security/user/" + user);
+        request.setJsonEntity(Strings.format("""
+            {
+              "password": "%s",
+              "roles" : ["%s"]
+            }""", PASS.toString(), role));
+        assertOK(performRequestAgainstFulfillingCluster(request));
+    }
+
+    private static void assertActionUnauthorized(
+        ResponseException exception,
+        String userAndRole,
+        String action,
+        String backingFailureIndexName
+    ) {
+        assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(403));
+        assertThat(
+            exception.getMessage(),
+            containsString(
+                "action ["
+                    + action
+                    + "] is unauthorized for user ["
+                    + userAndRole
+                    + "] "
+                    + "with effective roles ["
+                    + userAndRole
+                    + "] on indices ["
+                    + backingFailureIndexName
+                    + "]"
+            )
+        );
+    }
+}

+ 217 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityRCS2FailureStoreRestIT.java

@@ -0,0 +1,217 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.FeatureFlag;
+import org.elasticsearch.test.cluster.local.distribution.DistributionType;
+import org.elasticsearch.test.cluster.util.resource.Resource;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RemoteClusterSecurityRCS2FailureStoreRestIT extends AbstractRemoteClusterSecurityFailureStoreRestIT {
+
+    private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+
+    static {
+        fulfillingCluster = ElasticsearchCluster.local()
+            .distribution(DistributionType.DEFAULT)
+            .name("fulfilling-cluster")
+            .apply(commonClusterConfig)
+            .feature(FeatureFlag.FAILURE_STORE_ENABLED)
+            .setting("remote_cluster_server.enabled", "true")
+            .setting("remote_cluster.port", "0")
+            .setting("xpack.security.remote_cluster_server.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
+            .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .distribution(DistributionType.DEFAULT)
+            .name("query-cluster")
+            .apply(commonClusterConfig)
+            .feature(FeatureFlag.FAILURE_STORE_ENABLED)
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
+                API_KEY_MAP_REF.compareAndSet(null, createCrossClusterAccessApiKey("""
+                    {
+                        "search": [
+                          {
+                              "names": ["test*"]
+                          }
+                        ]
+                    }"""));
+                return (String) API_KEY_MAP_REF.get().get("encoded");
+            })
+            .rolesFile(Resource.fromClasspath("roles.yml"))
+            .build();
+    }
+
+    @ClassRule
+    // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
+    public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
+
+    public void testRCS2CrossClusterSearch() throws Exception {
+        // configure remote cluster using API Key-based authentication
+        configureRemoteCluster();
+        final String crossClusterAccessApiKeyId = (String) API_KEY_MAP_REF.get().get("id");
+        final boolean ccsMinimizeRoundtrips = randomBoolean();
+
+        // fulfilling cluster setup
+        setupTestDataStreamOnFulfillingCluster();
+
+        // query cluster setup
+        setupLocalDataOnQueryCluster();
+        setupUserAndRoleOnQueryCluster();
+
+        final Tuple<String, String> backingIndices = getSingleDataAndFailureIndices("test1");
+        final String backingDataIndexName = backingIndices.v1();
+        final String backingFailureIndexName = backingIndices.v2();
+        {
+            // query remote cluster without selectors should succeed
+            final boolean alsoSearchLocally = randomBoolean();
+            final Request dataSearchRequest = new Request(
+                "GET",
+                String.format(
+                    Locale.ROOT,
+                    "/%s%s:%s/_search?ccs_minimize_roundtrips=%s&ignore_unavailable=false",
+                    alsoSearchLocally ? "local_index," : "",
+                    randomFrom("my_remote_cluster", "*", "my_remote_*"),
+                    randomFrom("test1", "test*", "*", backingDataIndexName),
+                    ccsMinimizeRoundtrips
+                )
+            );
+            final String[] expectedIndices = alsoSearchLocally
+                ? new String[] { "local_index", backingDataIndexName }
+                : new String[] { backingDataIndexName };
+            assertSearchResponseContainsIndices(performRequestWithRemoteSearchUser(dataSearchRequest), expectedIndices);
+        }
+        {
+            // query remote cluster using ::data selector should fail
+            final boolean alsoSearchLocally = randomBoolean();
+            final Request dataSearchRequest = new Request(
+                "GET",
+                String.format(
+                    Locale.ROOT,
+                    "/%s:%s/_search?ccs_minimize_roundtrips=%s&ignore_unavailable=false",
+                    randomFrom("my_remote_cluster", "*", "my_remote_*"),
+                    randomFrom("test1::data", "test*::data", "*::data", "non-existing::data"),
+                    ccsMinimizeRoundtrips
+                )
+            );
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithRemoteSearchUser(dataSearchRequest)
+            );
+            assertSelectorsNotSupported(exception);
+        }
+        {
+            // query remote cluster using ::failures selector should fail
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithRemoteSearchUser(
+                    new Request(
+                        "GET",
+                        String.format(
+                            Locale.ROOT,
+                            "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
+                            randomFrom("test1::failures", "test*::failures", "*::failures", "non-existing::failures"),
+                            ccsMinimizeRoundtrips
+                        )
+                    )
+                )
+            );
+            assertSelectorsNotSupported(exception);
+        }
+        {
+            // direct access to backing failure index is not allowed - no explicit read privileges over .fs-* indices
+            Request failureIndexSearchRequest = new Request(
+                "GET",
+                String.format(
+                    Locale.ROOT,
+                    "/my_remote_cluster:%s/_search?ccs_minimize_roundtrips=%s",
+                    backingFailureIndexName,
+                    ccsMinimizeRoundtrips
+                )
+            );
+            final ResponseException exception = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithRemoteSearchUser(failureIndexSearchRequest)
+            );
+            assertThat(exception.getResponse().getStatusLine().getStatusCode(), equalTo(403));
+            assertThat(
+                exception.getMessage(),
+                containsString(
+                    "action ["
+                        + (ccsMinimizeRoundtrips ? "indices:data/read/search" : "indices:admin/search/search_shards")
+                        + "] towards remote cluster is unauthorized for user [remote_search_user] "
+                        + "with assigned roles [remote_search] authenticated by API key id ["
+                        + crossClusterAccessApiKeyId
+                        + "] of user [test_user] on indices ["
+                        + backingFailureIndexName
+                        + "], this action is granted by the index privileges ["
+                        + (ccsMinimizeRoundtrips ? "read,all" : "view_index_metadata,manage,read_cross_cluster,all")
+                        + "]"
+                )
+            );
+        }
+    }
+
+    private static void setupLocalDataOnQueryCluster() throws IOException {
+        // Index some documents, to use them in a mixed-cluster search
+        final var indexDocRequest = new Request("POST", "/local_index/_doc?refresh=true");
+        indexDocRequest.setJsonEntity("{\"local_foo\": \"local_bar\"}");
+        assertOK(client().performRequest(indexDocRequest));
+    }
+
+    private static void setupUserAndRoleOnQueryCluster() throws IOException {
+        final var putRoleRequest = new Request("PUT", "/_security/role/" + REMOTE_SEARCH_ROLE);
+        putRoleRequest.setJsonEntity("""
+            {
+              "description": "Role with privileges for remote and local indices.",
+              "indices": [
+                {
+                  "names": ["local_index"],
+                  "privileges": ["read"]
+                }
+              ],
+              "remote_indices": [
+                {
+                  "names": ["test*"],
+                  "privileges": ["read", "read_cross_cluster"],
+                  "clusters": ["my_remote_cluster"]
+                }
+              ]
+            }""");
+        assertOK(adminClient().performRequest(putRoleRequest));
+        final var putUserRequest = new Request("PUT", "/_security/user/" + REMOTE_SEARCH_USER);
+        putUserRequest.setJsonEntity("""
+            {
+              "password": "x-pack-test-password",
+              "roles" : ["remote_search"]
+            }""");
+        assertOK(adminClient().performRequest(putUserRequest));
+    }
+
+}