소스 검색

Ensure the correct threadContext for RemoteClusterNodesAction (#101050)

RemoteClusterNodesAction fetches NodesInfo with system context. It must
restore the original caller's context when respond back. This PR ensures
that.
Yang Wang 2 년 전
부모
커밋
c9835b8312

+ 5 - 0
docs/changelog/101050.yaml

@@ -0,0 +1,5 @@
+pr: 101050
+summary: Ensure the correct `threadContext` for `RemoteClusterNodesAction`
+area: Network
+type: bug
+issues: []

+ 9 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesAction.java

@@ -17,6 +17,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodesInfoMetrics;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.nodes.BaseNodeResponse;
 import org.elasticsearch.client.internal.Client;
@@ -100,6 +101,14 @@ public class RemoteClusterNodesAction {
         @Override
         protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
             final ThreadContext threadContext = client.threadPool().getThreadContext();
+            executeWithSystemContext(
+                request,
+                threadContext,
+                ContextPreservingActionListener.wrapPreservingContext(listener, threadContext)
+            );
+        }
+
+        private void executeWithSystemContext(Request request, ThreadContext threadContext, ActionListener<Response> listener) {
             try (var ignore = threadContext.stashContext()) {
                 threadContext.markAsSystemContext();
                 if (request.remoteClusterServer) {

+ 11 - 2
server/src/test/java/org/elasticsearch/action/admin/cluster/remote/RemoteClusterNodesActionTests.java

@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -114,6 +115,7 @@ public class RemoteClusterNodesActionTests extends ESTestCase {
                     Request request,
                     ActionListener<Response> listener
                 ) {
+                    assertThat(threadContext.isSystemContext(), is(true));
                     assertSame(TransportNodesInfoAction.TYPE, action);
                     assertThat(
                         asInstanceOf(NodesInfoRequest.class, request).requestedMetrics(),
@@ -128,7 +130,10 @@ public class RemoteClusterNodesActionTests extends ESTestCase {
         );
 
         final PlainActionFuture<RemoteClusterNodesAction.Response> future = new PlainActionFuture<>();
-        action.doExecute(mock(Task.class), RemoteClusterNodesAction.Request.REMOTE_CLUSTER_SERVER_NODES, future);
+        action.doExecute(mock(Task.class), RemoteClusterNodesAction.Request.REMOTE_CLUSTER_SERVER_NODES, ActionListener.wrap(response -> {
+            assertThat(threadContext.isSystemContext(), is(false));
+            future.onResponse(response);
+        }, future::onFailure));
 
         final List<DiscoveryNode> actualNodes = future.actionGet().getNodes();
         assertThat(Set.copyOf(actualNodes), equalTo(expectedRemoteServerNodes));
@@ -191,6 +196,7 @@ public class RemoteClusterNodesActionTests extends ESTestCase {
                     Request request,
                     ActionListener<Response> listener
                 ) {
+                    assertThat(threadContext.isSystemContext(), is(true));
                     assertSame(TransportNodesInfoAction.TYPE, action);
                     assertThat(asInstanceOf(NodesInfoRequest.class, request).requestedMetrics(), empty());
                     listener.onResponse((Response) nodesInfoResponse);
@@ -202,7 +208,10 @@ public class RemoteClusterNodesActionTests extends ESTestCase {
         );
 
         final PlainActionFuture<RemoteClusterNodesAction.Response> future = new PlainActionFuture<>();
-        action.doExecute(mock(Task.class), RemoteClusterNodesAction.Request.ALL_NODES, future);
+        action.doExecute(mock(Task.class), RemoteClusterNodesAction.Request.ALL_NODES, ActionListener.wrap(response -> {
+            assertThat(threadContext.isSystemContext(), is(false));
+            future.onResponse(response);
+        }, future::onFailure));
 
         final List<DiscoveryNode> actualNodes = future.actionGet().getNodes();
         assertThat(Set.copyOf(actualNodes), equalTo(expectedRemoteNodes));