Bläddra i källkod

TransportNodesAction shouldn't hold to cluster state

Long running TransportNodesAction requests can retain old cluster states in memory for much longer than needed. This can cause nodes with frequent cluster state updates and long running requests to run out of memory.
Igor Motov 10 år sedan
förälder
incheckning
ea99d97d8f

+ 9 - 3
core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java

@@ -30,6 +30,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.*;
@@ -95,17 +96,22 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
 
         private final NodesRequest request;
         private final String[] nodesIds;
+        private final DiscoveryNode[] nodes;
         private final ActionListener<NodesResponse> listener;
-        private final ClusterState clusterState;
         private final AtomicReferenceArray<Object> responses;
         private final AtomicInteger counter = new AtomicInteger();
 
         private AsyncAction(NodesRequest request, ActionListener<NodesResponse> listener) {
             this.request = request;
             this.listener = listener;
-            clusterState = clusterService.state();
+            ClusterState clusterState = clusterService.state();
             String[] nodesIds = resolveNodes(request, clusterState);
             this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
+            ImmutableOpenMap<String, DiscoveryNode> nodes = clusterState.nodes().nodes();
+            this.nodes = new DiscoveryNode[nodesIds.length];
+            for (int i = 0; i < nodesIds.length; i++) {
+                this.nodes[i] = nodes.get(nodesIds[i]);
+            }
             this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
         }
 
@@ -128,7 +134,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
             for (int i = 0; i < nodesIds.length; i++) {
                 final String nodeId = nodesIds[i];
                 final int idx = i;
-                final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
+                final DiscoveryNode node = nodes[i];
                 try {
                     if (node == null) {
                         onFailure(idx, nodeId, new NoSuchNodeException(nodeId));