Browse Source

Dispatch ClusterStateAction#buildResponse to executor (#103435)

If waiting for a particular cluster state (e.g. on the CCR leader) we
will compute the resulting cluster state and serialize it on the cluster
applier thread, which can be too expensive in a large cluster for this
thread. With this commit we dispatch the final action back to the
original executor.
David Turner 1 year ago
parent
commit
92babbb217

+ 5 - 0
docs/changelog/103435.yaml

@@ -0,0 +1,5 @@
+pr: 103435
+summary: Dispatch `ClusterStateAction#buildResponse` to executor
+area: Distributed
+type: bug
+issues: []

+ 4 - 1
server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

@@ -11,6 +11,7 @@ package org.elasticsearch.action.admin.cluster.state;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
 import org.elasticsearch.cluster.ClusterState;
@@ -112,7 +113,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
                         }
 
                         if (acceptableClusterStatePredicate.test(newState)) {
-                            ActionListener.completeWith(listener, () -> buildResponse(request, newState));
+                            executor.execute(ActionRunnable.supply(listener, () -> buildResponse(request, newState)));
                         } else {
                             listener.onFailure(
                                 new NotMasterException(
@@ -150,6 +151,8 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
     }
 
     private ClusterStateResponse buildResponse(final ClusterStateRequest request, final ClusterState currentState) {
+        ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT); // too heavy to construct & serialize cluster state without forking
+
         logger.trace("Serving cluster state request using version {}", currentState.version());
         ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
         builder.version(currentState.version());