Ver Fonte

Field capabilities index action should not fork its execution (#69865)

This commit removes the usage of the management thread pool to execute the field
capabilities index action. This action (similar to can_match) is cheap so it can
be executed on the same thread (network).
Jim Ferenczi há 4 anos atrás
pai
commit
d4599eefa4

+ 68 - 49
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -59,8 +59,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
         // retrieve the initial timestamp in case the action is a cross cluster search
         long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis();
         final ClusterState clusterState = clusterService.state();
-        final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(request.indicesOptions(),
-            request.indices());
+        final Map<String, OriginalIndices> remoteClusterIndices =
+            remoteClusterService.groupIndices(request.indicesOptions(), request.indices());
         final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
         final String[] concreteIndices;
         if (localIndices == null) {
@@ -70,62 +70,81 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
         }
         final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size();
-        final CountDown completionCounter = new CountDown(totalNumRequest);
-        final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
-        final Runnable onResponse = () -> {
-            if (completionCounter.countDown()) {
-                if (request.isMergeResults()) {
-                    listener.onResponse(merge(indexResponses, request.includeUnmapped()));
-                } else {
-                    listener.onResponse(new FieldCapabilitiesResponse(indexResponses));
-                }
-            }
-        };
         if (totalNumRequest == 0) {
             listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap()));
-        } else {
-            ActionListener<FieldCapabilitiesIndexResponse> innerListener = new ActionListener<FieldCapabilitiesIndexResponse>() {
-                @Override
-                public void onResponse(FieldCapabilitiesIndexResponse result) {
-                    if (result.canMatch()) {
-                        indexResponses.add(result);
+            return;
+        }
+
+        final CountDown completionCounter = new CountDown(totalNumRequest);
+        final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
+        final ActionListener<List<FieldCapabilitiesIndexResponse>> countDownListener = new ActionListener<>() {
+            @Override
+            public void onResponse(List<FieldCapabilitiesIndexResponse> results) {
+                for (FieldCapabilitiesIndexResponse res : results) {
+                    if (res.canMatch()) {
+                        indexResponses.add(res);
                     }
-                    onResponse.run();
                 }
+                countDown();
+            }
 
-                @Override
-                public void onFailure(Exception e) {
-                    // TODO we should somehow inform the user that we failed
-                    onResponse.run();
-                }
-            };
-            for (String index : concreteIndices) {
-                client.executeLocally(TransportFieldCapabilitiesIndexAction.TYPE, new FieldCapabilitiesIndexRequest(request.fields(),
-                    index, localIndices, request.indexFilter(), nowInMillis, request.runtimeFields()), innerListener);
+            @Override
+            public void onFailure(Exception e) {
+                // TODO we should somehow inform the user that we failed
+                countDown();
             }
 
-            // this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
-            // send us back all individual index results.
-            for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
-                String clusterAlias = remoteIndices.getKey();
-                OriginalIndices originalIndices = remoteIndices.getValue();
-                Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
-                FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
-                remoteRequest.setMergeResults(false); // we need to merge on this node
-                remoteRequest.indicesOptions(originalIndices.indicesOptions());
-                remoteRequest.indices(originalIndices.indices());
-                remoteRequest.fields(request.fields());
-                remoteRequest.runtimeFields(request.runtimeFields());
-                remoteRequest.indexFilter(request.indexFilter());
-                remoteRequest.nowInMillis(nowInMillis);
-                remoteClusterClient.fieldCaps(remoteRequest,  ActionListener.wrap(response -> {
-                    for (FieldCapabilitiesIndexResponse res : response.getIndexResponses()) {
-                        indexResponses.add(new FieldCapabilitiesIndexResponse(RemoteClusterAware.
-                            buildRemoteIndexName(clusterAlias, res.getIndexName()), res.get(), res.canMatch()));
+            private void countDown() {
+                if (completionCounter.countDown()) {
+                    if (request.isMergeResults()) {
+                        listener.onResponse(merge(indexResponses, request.includeUnmapped()));
+                    } else {
+                        listener.onResponse(new FieldCapabilitiesResponse(indexResponses));
                     }
-                    onResponse.run();
-                }, failure -> onResponse.run()));
+                }
             }
+        };
+
+        for (String index : concreteIndices) {
+            client.executeLocally(TransportFieldCapabilitiesIndexAction.TYPE,
+                new FieldCapabilitiesIndexRequest(
+                    request.fields(),
+                    index,
+                    localIndices,
+                    request.indexFilter(),
+                    nowInMillis, request.runtimeFields()
+                ),
+                ActionListener.wrap(
+                    response -> countDownListener.onResponse(Collections.singletonList(response)),
+                    countDownListener::onFailure
+                )
+            );
+        }
+
+        // this is the cross cluster part of this API - we force the other cluster to not merge the results but instead
+        // send us back all individual index results.
+        for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
+            String clusterAlias = remoteIndices.getKey();
+            OriginalIndices originalIndices = remoteIndices.getValue();
+            Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
+            FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
+            remoteRequest.setMergeResults(false); // we need to merge on this node
+            remoteRequest.indicesOptions(originalIndices.indicesOptions());
+            remoteRequest.indices(originalIndices.indices());
+            remoteRequest.fields(request.fields());
+            remoteRequest.runtimeFields(request.runtimeFields());
+            remoteRequest.indexFilter(request.indexFilter());
+            remoteRequest.nowInMillis(nowInMillis);
+            remoteClusterClient.fieldCaps(remoteRequest,
+                ActionListener.wrap(response -> {
+                    List<FieldCapabilitiesIndexResponse> remotes = new ArrayList<>();
+                    for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
+                        remotes.add(new FieldCapabilitiesIndexResponse(
+                            RemoteClusterAware.buildRemoteIndexName(clusterAlias, resp.getIndexName()),
+                            resp.get(), resp.canMatch()));
+                    }
+                    countDownListener.onResponse(remotes);
+                }, countDownListener::onFailure));
         }
     }
 

+ 12 - 11
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

@@ -12,7 +12,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.NoShardAvailableActionException;
 import org.elasticsearch.action.support.ActionFilters;
@@ -21,7 +20,6 @@ import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
-import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.GroupShardsIterator;
@@ -60,7 +58,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
 import java.util.function.Predicate;
 
 import static org.elasticsearch.action.support.TransportActions.isShardNotAvailableException;
@@ -77,20 +74,17 @@ public class TransportFieldCapabilitiesIndexAction
 
     private final ClusterService clusterService;
     private final TransportService transportService;
-    private final SearchService searchService;
     private final IndicesService indicesService;
-    private final Executor executor;
 
     @Inject
-    public TransportFieldCapabilitiesIndexAction(ClusterService clusterService, TransportService transportService,
-                                                 IndicesService indicesService, SearchService searchService, ThreadPool threadPool,
-                                                 ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
+    public TransportFieldCapabilitiesIndexAction(ClusterService clusterService,
+                                                 TransportService transportService,
+                                                 IndicesService indicesService,
+                                                 ActionFilters actionFilters) {
         super(ACTION_NAME, transportService, actionFilters, FieldCapabilitiesIndexRequest::new);
         this.clusterService = clusterService;
         this.transportService = transportService;
-        this.searchService = searchService;
         this.indicesService = indicesService;
-        this.executor = threadPool.executor(ThreadPool.Names.MANAGEMENT);
         transportService.registerRequestHandler(ACTION_SHARD_NAME, ThreadPool.Names.SAME,
             FieldCapabilitiesIndexRequest::new, new ShardTransportHandler());
     }
@@ -305,7 +299,14 @@ public class TransportFieldCapabilitiesIndexAction
                 logger.trace("executing [{}]", request);
             }
             ActionListener<FieldCapabilitiesIndexResponse> listener = new ChannelActionListener<>(channel, ACTION_SHARD_NAME, request);
-            executor.execute(ActionRunnable.supply(listener, () -> shardOperation(request)));
+            final FieldCapabilitiesIndexResponse resp;
+            try {
+                resp = shardOperation(request);
+            } catch (Exception exc) {
+                listener.onFailure(exc);
+                return;
+            }
+            listener.onResponse(resp);
         }
     }
 }

+ 35 - 6
x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java

@@ -30,12 +30,16 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.ESIntegTestCase;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
 import static org.hamcrest.Matchers.equalTo;
@@ -128,6 +132,8 @@ public abstract class AbstractEqlBlockingIntegTestCase extends AbstractEqlIntegT
 
         private final String nodeId;
 
+        private final ExecutorService executorService = Executors.newFixedThreadPool(1);
+
         public void reset() {
             contexts.set(0);
             fieldCaps.set(0);
@@ -183,11 +189,14 @@ public abstract class AbstractEqlBlockingIntegTestCase extends AbstractEqlIntegT
 
                 @Override
                 public <Request extends ActionRequest, Response extends ActionResponse> void apply(
-                    Task task, String action, Request request, ActionListener<Response> listener,
+                    Task task,
+                    String action,
+                    Request request,
+                    ActionListener<Response> listener,
                     ActionFilterChain<Request, Response> chain) {
-                    ActionListener<Response> listenerWrapper = listener;
+
                     if (action.equals(FieldCapabilitiesAction.NAME)) {
-                        listenerWrapper = ActionListener.wrap(resp -> {
+                        final Consumer<Response> actionWrapper = resp -> {
                             try {
                                 fieldCaps.incrementAndGet();
                                 logger.trace("blocking field caps on " + nodeId);
@@ -198,14 +207,34 @@ public abstract class AbstractEqlBlockingIntegTestCase extends AbstractEqlIntegT
                             } finally {
                                 listener.onResponse(resp);
                             }
-                        }, listener::onFailure);
-
+                            logger.trace("unblocking field caps on " + nodeId);
+                        };
+                        final Thread originalThread = Thread.currentThread();
+                        chain.proceed(task, action, request,
+                            ActionListener.wrap(
+                                resp -> {
+                                    if (originalThread == Thread.currentThread()) {
+                                        // async if we never exited the original thread
+                                        executorService.execute(() -> actionWrapper.accept(resp));
+                                    } else {
+                                        actionWrapper.accept(resp);
+                                    }
+                                },
+                                listener::onFailure)
+                        );
+                    } else {
+                        chain.proceed(task, action, request, listener);
                     }
-                    chain.proceed(task, action, request, listenerWrapper);
                 }
             });
             return list;
         }
+
+        @Override
+        public void close() throws IOException {
+            List<Runnable> runnables = executorService.shutdownNow();
+            assertTrue(runnables.isEmpty());
+        }
     }
 
     @Override