Browse Source

Support transform for RCS 2.0 (#95169)

This PR adds support for transform jobs with remote indices for RCS 2.0.
Yang Wang 2 years ago
parent
commit
d5e49f3154

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -1408,6 +1408,11 @@ public abstract class ESRestTestCase extends ESTestCase {
         assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
         assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
     }
     }
 
 
+    public static ObjectPath assertOKAndCreateObjectPath(Response response) throws IOException {
+        assertOK(response);
+        return ObjectPath.createFromResponse(response);
+    }
+
     /**
     /**
      * Assert that the index in question has the given number of documents present
      * Assert that the index in question has the given number of documents present
      */
      */

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/ClusterPrivilegeResolver.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.util.Maps;
 import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.RemoteClusterService;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.TcpTransport;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.xpack.core.action.XPackInfoAction;
 import org.elasticsearch.xpack.core.ilm.action.GetLifecycleAction;
 import org.elasticsearch.xpack.core.ilm.action.GetLifecycleAction;
 import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
 import org.elasticsearch.xpack.core.ilm.action.GetStatusAction;
 import org.elasticsearch.xpack.core.ilm.action.StartILMAction;
 import org.elasticsearch.xpack.core.ilm.action.StartILMAction;
@@ -156,7 +157,8 @@ public class ClusterPrivilegeResolver {
 
 
     private static final Set<String> CROSS_CLUSTER_ACCESS_PATTERN = Set.of(
     private static final Set<String> CROSS_CLUSTER_ACCESS_PATTERN = Set.of(
         RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
         RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME,
-        RemoteClusterNodesAction.NAME
+        RemoteClusterNodesAction.NAME,
+        XPackInfoAction.NAME
     );
     );
     private static final Set<String> MANAGE_ENRICH_AUTOMATON = Set.of("cluster:admin/xpack/enrich/*");
     private static final Set<String> MANAGE_ENRICH_AUTOMATON = Set.of("cluster:admin/xpack/enrich/*");
 
 

+ 2 - 0
x-pack/plugin/security/qa/multi-cluster/build.gradle

@@ -14,6 +14,8 @@ apply plugin: 'elasticsearch.bwc-test'
 
 
 dependencies {
 dependencies {
   clusterModules(project(":modules:analysis-common"))
   clusterModules(project(":modules:analysis-common"))
+  clusterModules(project(":modules:reindex")) // need for deleting transform jobs
+  clusterModules(project(":x-pack:plugin:transform"))
 }
 }
 
 
 tasks.named("javaRestTest") {
 tasks.named("javaRestTest") {

+ 6 - 7
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/AbstractRemoteClusterSecurityTestCase.java

@@ -24,7 +24,7 @@ import org.elasticsearch.test.cluster.FeatureFlag;
 import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
 import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
 import org.elasticsearch.test.cluster.util.resource.Resource;
 import org.elasticsearch.test.cluster.util.resource.Resource;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.test.rest.ESRestTestCase;
-import org.elasticsearch.xcontent.ObjectPath;
+import org.elasticsearch.test.rest.ObjectPath;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 
 
@@ -35,7 +35,6 @@ import java.util.Base64;
 import java.util.Map;
 import java.util.Map;
 
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 
@@ -45,6 +44,7 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa
     protected static final SecureString PASS = new SecureString("x-pack-test-password".toCharArray());
     protected static final SecureString PASS = new SecureString("x-pack-test-password".toCharArray());
     protected static final String REMOTE_SEARCH_USER = "remote_search_user";
     protected static final String REMOTE_SEARCH_USER = "remote_search_user";
     protected static final String REMOTE_METRIC_USER = "remote_metric_user";
     protected static final String REMOTE_METRIC_USER = "remote_metric_user";
+    protected static final String REMOTE_TRANSFORM_USER = "remote_transform_user";
     protected static final String REMOTE_SEARCH_ROLE = "remote_search";
     protected static final String REMOTE_SEARCH_ROLE = "remote_search";
 
 
     protected static LocalClusterConfigProvider commonClusterConfig = cluster -> cluster.module("analysis-common")
     protected static LocalClusterConfigProvider commonClusterConfig = cluster -> cluster.module("analysis-common")
@@ -188,13 +188,12 @@ public abstract class AbstractRemoteClusterSecurityTestCase extends ESRestTestCa
         assertBusy(() -> {
         assertBusy(() -> {
             final Response remoteInfoResponse = adminClient().performRequest(remoteInfoRequest);
             final Response remoteInfoResponse = adminClient().performRequest(remoteInfoRequest);
             assertOK(remoteInfoResponse);
             assertOK(remoteInfoResponse);
-            final Map<String, Object> remoteInfoMap = responseAsMap(remoteInfoResponse);
-            assertThat(remoteInfoMap, hasKey(clusterAlias));
-            assertThat(ObjectPath.eval(clusterAlias + ".connected", remoteInfoMap), is(true));
+            final ObjectPath remoteInfoObjectPath = assertOKAndCreateObjectPath(remoteInfoResponse);
+            assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".connected"), is(true));
             if (false == isProxyMode) {
             if (false == isProxyMode) {
-                assertThat(ObjectPath.eval(clusterAlias + ".num_nodes_connected", remoteInfoMap), equalTo(numberOfFcNodes));
+                assertThat(remoteInfoObjectPath.evaluate(clusterAlias + ".num_nodes_connected"), equalTo(numberOfFcNodes));
             }
             }
-            final String credentialsValue = ObjectPath.eval(clusterAlias + ".cluster_credentials", remoteInfoMap);
+            final String credentialsValue = remoteInfoObjectPath.evaluate(clusterAlias + ".cluster_credentials");
             if (basicSecurity) {
             if (basicSecurity) {
                 assertThat(credentialsValue, nullValue());
                 assertThat(credentialsValue, nullValue());
             } else {
             } else {

+ 232 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityTransformIT.java

@@ -0,0 +1,232 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.remotecluster;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.cluster.util.resource.Resource;
+import org.elasticsearch.test.rest.ObjectPath;
+import org.junit.ClassRule;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TestRule;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+
+public class RemoteClusterSecurityTransformIT extends AbstractRemoteClusterSecurityTestCase {
+
+    private static final AtomicReference<Map<String, Object>> API_KEY_MAP_REF = new AtomicReference<>();
+
+    static {
+        fulfillingCluster = ElasticsearchCluster.local()
+            .name("fulfilling-cluster")
+            .apply(commonClusterConfig)
+            .module("transform")
+            .module("reindex")
+            .setting("remote_cluster_server.enabled", "true")
+            .setting("remote_cluster.port", "0")
+            .setting("xpack.security.remote_cluster_server.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_server.ssl.key", "remote-cluster.key")
+            .setting("xpack.security.remote_cluster_server.ssl.certificate", "remote-cluster.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("xpack.security.remote_cluster_server.ssl.secure_key_passphrase", "remote-cluster-password")
+            .build();
+
+        queryCluster = ElasticsearchCluster.local()
+            .name("query-cluster")
+            .apply(commonClusterConfig)
+            .module("transform")
+            .module("reindex")
+            .setting("xpack.security.remote_cluster_client.ssl.enabled", "true")
+            .setting("xpack.security.remote_cluster_client.ssl.certificate_authorities", "remote-cluster-ca.crt")
+            .setting("xpack.security.authc.token.enabled", "true")
+            .keystore("cluster.remote.my_remote_cluster.credentials", () -> {
+                API_KEY_MAP_REF.compareAndSet(null, createCrossClusterAccessApiKey("""
+                    [
+                      {
+                         "names": ["shared-transform-index"],
+                         "privileges": ["read", "read_cross_cluster", "view_index_metadata"]
+                      }
+                    ]"""));
+                return (String) API_KEY_MAP_REF.get().get("encoded");
+            })
+            .rolesFile(Resource.fromClasspath("roles.yml"))
+            .user(REMOTE_TRANSFORM_USER, PASS.toString(), "transform_admin,transform_remote_shared_index")
+            .build();
+    }
+
+    @ClassRule
+    // Use a RuleChain to ensure that fulfilling cluster is started before query cluster
+    public static TestRule clusterRule = RuleChain.outerRule(fulfillingCluster).around(queryCluster);
+
+    public void testCrossClusterTransform() throws Exception {
+        configureRemoteClusters();
+
+        // Fulfilling cluster
+        {
+            final Request createIndexRequest1 = new Request("PUT", "shared-transform-index");
+            createIndexRequest1.setJsonEntity("""
+                {
+                  "mappings": {
+                    "properties": {
+                      "user": { "type": "keyword" },
+                      "stars": { "type": "integer" },
+                      "coolness": { "type": "integer" }
+                    }
+                  }
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(createIndexRequest1));
+
+            // Index some documents, so we can attempt to transform them from the querying cluster
+            final Request bulkRequest1 = new Request("POST", "/_bulk?refresh=true");
+            bulkRequest1.setJsonEntity(Strings.format("""
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "a", "stars": 1}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "a", "stars": 4}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "a", "stars": 5}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "b", "stars": 2}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "b", "stars": 3}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "a", "stars": 5}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "b", "stars": 1}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "a", "stars": 3}
+                {"index": {"_index": "shared-transform-index"}}
+                {"user": "c", "stars": 4}
+                """));
+            assertOK(performRequestAgainstFulfillingCluster(bulkRequest1));
+
+            // Create another index that the transform user does not have access
+            final Request createIndexRequest2 = new Request("PUT", "private-transform-index");
+            createIndexRequest2.setJsonEntity("""
+                {
+                  "mappings": {
+                    "properties": {
+                      "user": { "type": "keyword" },
+                      "stars": { "type": "integer" },
+                      "coolness": { "type": "integer" }
+                    }
+                  }
+                }""");
+            assertOK(performRequestAgainstFulfillingCluster(createIndexRequest2));
+        }
+
+        // Query cluster
+        {
+            // Create a transform
+            final var putTransformRequest = new Request("PUT", "/_transform/simple-remote-transform");
+            putTransformRequest.setJsonEntity("""
+                {
+                  "source": { "index": "my_remote_cluster:shared-transform-index" },
+                  "dest": { "index": "simple-remote-transform" },
+                  "pivot": {
+                    "group_by": { "user": {"terms": {"field": "user"}}},
+                    "aggs": {"avg_stars": {"avg": {"field": "stars"}}}
+                  }
+                }
+                """);
+            assertOK(performRequestWithRemoteTransformUser(putTransformRequest));
+            final ObjectPath getTransformObjPath = assertOKAndCreateObjectPath(
+                performRequestWithRemoteTransformUser(new Request("GET", "/_transform/simple-remote-transform"))
+            );
+            assertThat(getTransformObjPath.evaluate("count"), equalTo(1));
+            assertThat(getTransformObjPath.evaluate("transforms.0.id"), equalTo("simple-remote-transform"));
+
+            // Start the transform
+            assertOK(performRequestWithRemoteTransformUser(new Request("POST", "/_transform/simple-remote-transform/_start")));
+
+            // Get the stats
+            final Request transformStatsRequest = new Request("GET", "/_transform/simple-remote-transform/_stats");
+            final ObjectPath transformStatsObjPath = assertOKAndCreateObjectPath(
+                performRequestWithRemoteTransformUser(transformStatsRequest)
+            );
+            assertThat(transformStatsObjPath.evaluate("node_failures"), nullValue());
+            assertThat(transformStatsObjPath.evaluate("count"), equalTo(1));
+            assertThat(transformStatsObjPath.evaluate("transforms.0.id"), equalTo("simple-remote-transform"));
+
+            // Stop the transform and force it to complete
+            assertOK(
+                performRequestWithRemoteTransformUser(
+                    new Request("POST", "/_transform/simple-remote-transform/_stop?wait_for_completion=true&wait_for_checkpoint=true")
+                )
+            );
+
+            // Get stats again
+            final ObjectPath transformStatsObjPath2 = assertOKAndCreateObjectPath(
+                performRequestWithRemoteTransformUser(transformStatsRequest)
+            );
+            assertThat(transformStatsObjPath2.evaluate("node_failures"), nullValue());
+            assertThat(transformStatsObjPath2.evaluate("count"), equalTo(1));
+            assertThat(transformStatsObjPath2.evaluate("transforms.0.state"), equalTo("stopped"));
+            assertThat(transformStatsObjPath2.evaluate("transforms.0.checkpointing.last.checkpoint"), equalTo(1));
+
+            // Ensure transformed data is available locally
+            final ObjectPath searchObjPath = assertOKAndCreateObjectPath(
+                performRequestWithRemoteTransformUser(
+                    new Request("GET", "/simple-remote-transform/_search?sort=user&rest_total_hits_as_int=true")
+                )
+            );
+            assertThat(searchObjPath.evaluate("hits.total"), equalTo(3));
+            assertThat(searchObjPath.evaluate("hits.hits.0._index"), equalTo("simple-remote-transform"));
+            assertThat(searchObjPath.evaluate("hits.hits.0._source.avg_stars"), equalTo(3.6));
+            assertThat(searchObjPath.evaluate("hits.hits.0._source.user"), equalTo("a"));
+            assertThat(searchObjPath.evaluate("hits.hits.1._source.avg_stars"), equalTo(2.0));
+            assertThat(searchObjPath.evaluate("hits.hits.1._source.user"), equalTo("b"));
+            assertThat(searchObjPath.evaluate("hits.hits.2._source.avg_stars"), equalTo(4.0));
+            assertThat(searchObjPath.evaluate("hits.hits.2._source.user"), equalTo("c"));
+
+            // Preview
+            assertOK(performRequestWithRemoteTransformUser(new Request("GET", "/_transform/simple-remote-transform/_preview")));
+
+            // Delete the transform
+            assertOK(performRequestWithRemoteTransformUser(new Request("DELETE", "/_transform/simple-remote-transform")));
+
+            // Create a transform targeting an index without permission
+            final var putTransformRequest2 = new Request("PUT", "/_transform/invalid");
+            putTransformRequest2.setJsonEntity("""
+                {
+                  "source": { "index": "my_remote_cluster:private-transform-index" },
+                  "dest": { "index": "simple-remote-transform" },
+                  "pivot": {
+                    "group_by": { "user": {"terms": {"field": "user"}}},
+                    "aggs": {"avg_stars": {"avg": {"field": "stars"}}}
+                  }
+                }
+                """);
+            assertOK(performRequestWithRemoteTransformUser(putTransformRequest2));
+            // It errors when trying to preview it
+            final ResponseException e = expectThrows(
+                ResponseException.class,
+                () -> performRequestWithRemoteTransformUser(new Request("GET", "/_transform/invalid/_preview"))
+            );
+            assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
+            assertThat(e.getMessage(), containsString("Source indices have been deleted or closed"));
+        }
+    }
+
+    private Response performRequestWithRemoteTransformUser(final Request request) throws IOException {
+        request.setOptions(
+            RequestOptions.DEFAULT.toBuilder().addHeader("Authorization", basicAuthHeaderValue(REMOTE_TRANSFORM_USER, PASS))
+        );
+        return client().performRequest(request);
+    }
+}

+ 8 - 0
x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/resources/roles.yml

@@ -10,3 +10,11 @@ read_remote_shared_metrics:
       privileges: [ 'read', 'read_cross_cluster' ]
       privileges: [ 'read', 'read_cross_cluster' ]
       clusters: [ 'my_*' ]
       clusters: [ 'my_*' ]
 
 
+transform_remote_shared_index:
+  indices:
+    - names: [ 'simple-remote-transform' ]
+      privileges: ['create_index', 'index', 'read']
+  remote_indices:
+    - names: [ 'shared-transform-index' ]
+      privileges: [ 'read', 'read_cross_cluster', 'view_index_metadata' ]
+      clusters: [ 'my_*' ]

+ 5 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java

@@ -221,13 +221,16 @@ public class AuthorizationService {
                 + subject.getUser().principal()
                 + subject.getUser().principal()
                 + "] is an internal user and we should never try to retrieve its roles descriptors towards a remote cluster";
                 + "] is an internal user and we should never try to retrieve its roles descriptors towards a remote cluster";
             assert false : message;
             assert false : message;
+            logger.warn(message);
             listener.onFailure(new IllegalArgumentException(message));
             listener.onFailure(new IllegalArgumentException(message));
             return;
             return;
         }
         }
 
 
         final AuthorizationEngine authorizationEngine = getAuthorizationEngineForSubject(subject);
         final AuthorizationEngine authorizationEngine = getAuthorizationEngineForSubject(subject);
-        final AuthorizationInfo authorizationInfo = threadContext.getTransient(AUTHORIZATION_INFO_KEY);
-        assert authorizationInfo != null : "authorization info must be available in thread context";
+        // AuthZ info can be null for persistent tasks
+        if (threadContext.<AuthorizationInfo>getTransient(AUTHORIZATION_INFO_KEY) == null) {
+            logger.debug("authorization info not available in thread context, resolving it for subject [{}]", subject);
+        }
         authorizationEngine.resolveAuthorizationInfo(
         authorizationEngine.resolveAuthorizationInfo(
             subject,
             subject,
             wrapPreservingContext(
             wrapPreservingContext(

+ 6 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/CrossClusterAccessServerTransportFilter.java

@@ -25,8 +25,10 @@ import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportActionProxy;
 import org.elasticsearch.transport.TransportActionProxy;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportRequest;
+import org.elasticsearch.xpack.core.action.XPackInfoAction;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
+import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
 import org.elasticsearch.xpack.security.Security;
 import org.elasticsearch.xpack.security.Security;
 import org.elasticsearch.xpack.security.audit.AuditUtil;
 import org.elasticsearch.xpack.security.audit.AuditUtil;
 import org.elasticsearch.xpack.security.authc.CrossClusterAccessAuthenticationService;
 import org.elasticsearch.xpack.security.authc.CrossClusterAccessAuthenticationService;
@@ -88,7 +90,10 @@ final class CrossClusterAccessServerTransportFilter extends ServerTransportFilte
                 ResolveIndexAction.NAME,
                 ResolveIndexAction.NAME,
                 FieldCapabilitiesAction.NAME,
                 FieldCapabilitiesAction.NAME,
                 FieldCapabilitiesAction.NAME + "[n]",
                 FieldCapabilitiesAction.NAME + "[n]",
-                "indices:data/read/eql"
+                "indices:data/read/eql",
+                // transform
+                XPackInfoAction.NAME,
+                GetCheckpointAction.NAME
             )
             )
         ).collect(Collectors.toUnmodifiableSet());
         ).collect(Collectors.toUnmodifiableSet());
     }
     }

+ 32 - 18
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceIntegTests.java

@@ -170,24 +170,38 @@ public class AuthorizationServiceIntegTests extends SecurityIntegTestCase {
             // A request ID is set during authentication and is required for authorization; since we are not authenticating, set it
             // A request ID is set during authentication and is required for authorization; since we are not authenticating, set it
             // explicitly
             // explicitly
             AuditUtil.generateRequestId(threadContext);
             AuditUtil.generateRequestId(threadContext);
-            // Authorize to populate thread context with authz info
-            // Note that if the outer listener throws, we will not count down on the latch, however, we also won't get to the await call
-            // since the exception will be thrown before -- so no deadlock
-            authzService.authorize(
-                authentication,
-                AuthenticateAction.INSTANCE.name(),
-                AuthenticateRequest.INSTANCE,
-                ActionTestUtils.assertNoFailureListener(nothing -> {
-                    authzService.getRoleDescriptorsIntersectionForRemoteCluster(
-                        concreteClusterAlias,
-                        authentication.getEffectiveSubject(),
-                        new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(newValue -> {
-                            assertThat(threadContext.getTransient(AUTHORIZATION_INFO_KEY), not(nullValue()));
-                            actual.set(newValue);
-                        }), latch)
-                    );
-                })
-            );
+
+            // Get Role Descriptors for remote cluster should work regardless whether threadContext has existing authz info
+            if (randomBoolean()) {
+                // Authorize to populate thread context with authz info
+                // Note that if the outer listener throws, we will not count down on the latch, however, we also won't get to the await call
+                // since the exception will be thrown before -- so no deadlock
+                authzService.authorize(
+                    authentication,
+                    AuthenticateAction.INSTANCE.name(),
+                    AuthenticateRequest.INSTANCE,
+                    ActionTestUtils.assertNoFailureListener(nothing -> {
+                        authzService.getRoleDescriptorsIntersectionForRemoteCluster(
+                            concreteClusterAlias,
+                            authentication.getEffectiveSubject(),
+                            new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(newValue -> {
+                                assertThat(threadContext.getTransient(AUTHORIZATION_INFO_KEY), not(nullValue()));
+                                actual.set(newValue);
+                            }), latch)
+                        );
+                    })
+                );
+            } else {
+                authzService.getRoleDescriptorsIntersectionForRemoteCluster(
+                    concreteClusterAlias,
+                    authentication.getEffectiveSubject(),
+                    new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(newValue -> {
+                        assertThat(threadContext.getTransient(AUTHORIZATION_INFO_KEY), nullValue());
+                        actual.set(newValue);
+                    }), latch)
+                );
+            }
+
             latch.await();
             latch.await();
             // Validate original authz info is restored after call complete
             // Validate original authz info is restored after call complete
             assertThat(threadContext.getTransient(AUTHORIZATION_INFO_KEY), nullValue());
             assertThat(threadContext.getTransient(AUTHORIZATION_INFO_KEY), nullValue());