|
@@ -194,39 +194,52 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|
|
/**
|
|
|
* Collects all nodes on the connected cluster and returns / passes a nodeID to {@link DiscoveryNode} lookup function
|
|
|
* that returns <code>null</code> if the node ID is not found.
|
|
|
+ *
|
|
|
+ * The requests to get cluster state on the connected cluster are made in the system context because logically
|
|
|
+ * they are equivalent to checking a single detail in the local cluster state and should not require that the
|
|
|
+ * user who made the request that is using this method in its implementation is authorized to view the entire
|
|
|
+ * cluster state.
|
|
|
*/
|
|
|
void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
|
|
|
Runnable runnable = () -> {
|
|
|
- final ClusterStateRequest request = new ClusterStateRequest();
|
|
|
- request.clear();
|
|
|
- request.nodes(true);
|
|
|
- request.local(true); // run this on the node that gets the request it's as good as any other
|
|
|
- final DiscoveryNode node = getAnyConnectedNode();
|
|
|
- Transport.Connection connection = connectionManager.getConnection(node);
|
|
|
- transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
|
|
- new TransportResponseHandler<ClusterStateResponse>() {
|
|
|
-
|
|
|
- @Override
|
|
|
- public ClusterStateResponse read(StreamInput in) throws IOException {
|
|
|
- return new ClusterStateResponse(in);
|
|
|
- }
|
|
|
+ final ThreadContext threadContext = threadPool.getThreadContext();
|
|
|
+ final ContextPreservingActionListener<Function<String, DiscoveryNode>> contextPreservingActionListener =
|
|
|
+ new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener);
|
|
|
+ try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
|
|
+ // we stash any context here since this is an internal execution and should not leak any existing context information
|
|
|
+ threadContext.markAsSystemContext();
|
|
|
+
|
|
|
+ final ClusterStateRequest request = new ClusterStateRequest();
|
|
|
+ request.clear();
|
|
|
+ request.nodes(true);
|
|
|
+ request.local(true); // run this on the node that gets the request it's as good as any other
|
|
|
+ final DiscoveryNode node = getAnyConnectedNode();
|
|
|
+ Transport.Connection connection = connectionManager.getConnection(node);
|
|
|
+ transportService.sendRequest(connection, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
|
|
|
+ new TransportResponseHandler<ClusterStateResponse>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ClusterStateResponse read(StreamInput in) throws IOException {
|
|
|
+ return new ClusterStateResponse(in);
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void handleResponse(ClusterStateResponse response) {
|
|
|
- DiscoveryNodes nodes = response.getState().nodes();
|
|
|
- listener.onResponse(nodes::get);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void handleResponse(ClusterStateResponse response) {
|
|
|
+ DiscoveryNodes nodes = response.getState().nodes();
|
|
|
+ contextPreservingActionListener.onResponse(nodes::get);
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public void handleException(TransportException exp) {
|
|
|
- listener.onFailure(exp);
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void handleException(TransportException exp) {
|
|
|
+ contextPreservingActionListener.onFailure(exp);
|
|
|
+ }
|
|
|
|
|
|
- @Override
|
|
|
- public String executor() {
|
|
|
- return ThreadPool.Names.SAME;
|
|
|
- }
|
|
|
- });
|
|
|
+ @Override
|
|
|
+ public String executor() {
|
|
|
+ return ThreadPool.Names.SAME;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
};
|
|
|
try {
|
|
|
// just in case if we are not connected for some reason we try to connect and if we fail we have to notify the listener
|