Browse Source

[CCR] Auto follow Coordinator fetch cluster state in system context (#35120)

Auto follow Coordinator should fetch the leader cluster state using system context.
Martijn van Groningen 7 years ago
parent
commit
8a85251da0

+ 21 - 11
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

@@ -103,9 +103,8 @@ public final class CcrLicenseChecker {
      * @param leaderIndex   the name of the leader index
      * @param onFailure     the failure consumer
      * @param consumer      the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
-     * @param <T>           the type of response the listener is waiting for
      */
-    public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
+    public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
             final Client client,
             final String clusterAlias,
             final String leaderIndex,
@@ -118,8 +117,8 @@ public final class CcrLicenseChecker {
         request.indices(leaderIndex);
         checkRemoteClusterLicenseAndFetchClusterState(
                 client,
-                Collections.emptyMap(),
                 clusterAlias,
+                client.getRemoteClusterClient(clusterAlias),
                 request,
                 onFailure,
                 leaderClusterState -> {
@@ -151,22 +150,20 @@ public final class CcrLicenseChecker {
      *
      * @param client                     the client
      * @param clusterAlias               the remote cluster alias
-     * @param headers                    the headers to use for leader client
      * @param request                    the cluster state request
      * @param onFailure                  the failure consumer
      * @param leaderClusterStateConsumer the leader cluster state consumer
      */
     public void checkRemoteClusterLicenseAndFetchClusterState(
             final Client client,
-            final Map<String, String> headers,
             final String clusterAlias,
             final ClusterStateRequest request,
             final Consumer<Exception> onFailure,
             final Consumer<ClusterState> leaderClusterStateConsumer) {
         checkRemoteClusterLicenseAndFetchClusterState(
                 client,
-                headers,
                 clusterAlias,
+                systemClient(client.getRemoteClusterClient(clusterAlias)),
                 request,
                 onFailure,
                 leaderClusterStateConsumer,
@@ -182,18 +179,17 @@ public final class CcrLicenseChecker {
      *
      * @param client                     the client
      * @param clusterAlias               the remote cluster alias
-     * @param headers                    the headers to use for leader client
+     * @param leaderClient               the leader client to use to execute cluster state API
      * @param request                    the cluster state request
      * @param onFailure                  the failure consumer
      * @param leaderClusterStateConsumer the leader cluster state consumer
      * @param nonCompliantLicense        the supplier for when the license state of the remote cluster is non-compliant
      * @param unknownLicense             the supplier for when the license state of the remote cluster is unknown due to failure
-     * @param <T>                        the type of response the listener is waiting for
      */
-    private <T> void checkRemoteClusterLicenseAndFetchClusterState(
+    private void checkRemoteClusterLicenseAndFetchClusterState(
             final Client client,
-            final Map<String, String> headers,
             final String clusterAlias,
+            final Client leaderClient,
             final ClusterStateRequest request,
             final Consumer<Exception> onFailure,
             final Consumer<ClusterState> leaderClusterStateConsumer,
@@ -207,7 +203,6 @@ public final class CcrLicenseChecker {
                     @Override
                     public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
                         if (licenseCheck.isSuccess()) {
-                            final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
                             final ActionListener<ClusterStateResponse> clusterStateListener =
                                     ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
                             // following an index in remote cluster, so use remote client to fetch leader index metadata
@@ -361,6 +356,21 @@ public final class CcrLicenseChecker {
         }
     }
 
+    private static Client systemClient(Client client) {
+        final ThreadContext threadContext = client.threadPool().getThreadContext();
+        return new FilterClient(client) {
+            @Override
+            protected <Request extends ActionRequest, Response extends ActionResponse>
+            void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
+                final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
+                try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
+                    threadContext.markAsSystemContext();
+                    super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
+                }
+            }
+        };
+    }
+
     private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
         final ThreadContext.StoredContext storedContext = threadContext.stashContext();
         threadContext.copyHeaders(headers.entrySet());

+ 2 - 7
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

@@ -159,8 +159,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
         AutoFollower operation = new AutoFollower(handler, followerClusterState) {
 
             @Override
-            void getLeaderClusterState(final Map<String, String> headers,
-                                       final String remoteCluster,
+            void getLeaderClusterState(final String remoteCluster,
                                        final BiConsumer<ClusterState, Exception> handler) {
                 final ClusterStateRequest request = new ClusterStateRequest();
                 request.clear();
@@ -168,7 +167,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
                 // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
                 ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
                     client,
-                    headers,
                     remoteCluster,
                     request,
                     e -> handler.accept(null, e),
@@ -249,7 +247,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
                 final String remoteCluster = autoFollowPattern.getRemoteCluster();
 
                 Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
-                getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> {
+                getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
                     if (leaderClusterState != null) {
                         assert e == null;
                         final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
@@ -413,13 +411,10 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
 
         /**
          * Fetch the cluster state from the leader with the specified cluster alias
-         *
-         * @param headers            the client headers
          * @param remoteCluster      the name of the leader cluster
          * @param handler            the callback to invoke
          */
         abstract void getLeaderClusterState(
-            Map<String, String> headers,
             String remoteCluster,
             BiConsumer<ClusterState, Exception> handler
         );

+ 5 - 13
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

@@ -105,20 +105,12 @@ public final class TransportPutFollowAction
         client.getRemoteClusterClient(remoteCluster);
 
         String leaderIndex = request.getLeaderIndex();
-        createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener);
-    }
-
-    private void createFollowerIndexAndFollowRemoteIndex(
-            final PutFollowAction.Request request,
-            final String remoteCluster,
-            final String leaderIndex,
-            final ActionListener<PutFollowAction.Response> listener) {
         ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
-                client,
-                remoteCluster,
-                leaderIndex,
-                listener::onFailure,
-                (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
+            client,
+            remoteCluster,
+            leaderIndex,
+            listener::onFailure,
+            (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
     }
 
     private void createFollowerIndex(

+ 5 - 9
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

@@ -83,10 +83,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         };
         AutoFollower autoFollower = new AutoFollower(handler, currentState) {
             @Override
-            void getLeaderClusterState(Map<String, String> headers,
-                                       String remoteCluster,
+            void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
-                assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
+                assertThat(remoteCluster, equalTo("remote"));
                 handler.accept(leaderState, null);
             }
 
@@ -143,8 +142,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         };
         AutoFollower autoFollower = new AutoFollower(handler, followerState) {
             @Override
-            void getLeaderClusterState(Map<String, String> headers,
-                                       String remoteCluster,
+            void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
                 handler.accept(null, failure);
             }
@@ -204,8 +202,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         };
         AutoFollower autoFollower = new AutoFollower(handler, followerState) {
             @Override
-            void getLeaderClusterState(Map<String, String> headers,
-                                       String remoteCluster,
+            void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
                 handler.accept(leaderState, null);
             }
@@ -267,8 +264,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
         };
         AutoFollower autoFollower = new AutoFollower(handler, followerState) {
             @Override
-            void getLeaderClusterState(Map<String, String> headers,
-                                       String remoteCluster,
+            void getLeaderClusterState(String remoteCluster,
                                        BiConsumer<ClusterState, Exception> handler) {
                 handler.accept(leaderState, null);
             }