Browse Source

Start polling after data computation is started (#128575)

Ievgen Degtiarenko 4 months ago
parent
commit
03173af0c7

+ 8 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

@@ -125,14 +125,6 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
                     updateExecutionInfo(executionInfo, clusterAlias, finalResponse.get());
                     return completionInfo;
                 }))) {
-                    var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
-                    exchangeSource.addRemoteSink(
-                        remoteSink,
-                        failFast,
-                        () -> pagesFetched.set(true),
-                        queryPragmas.concurrentExchangeClients(),
-                        computeListener.acquireAvoid()
-                    );
                     var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices);
                     var clusterRequest = new ClusterComputeRequest(clusterAlias, childSessionId, configuration, remotePlan);
                     final ActionListener<ComputeResponse> clusterListener = computeListener.acquireCompute().map(r -> {
@@ -147,6 +139,14 @@ final class ClusterComputeHandler implements TransportRequestHandler<ClusterComp
                         TransportRequestOptions.EMPTY,
                         new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor)
                     );
+                    var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, cluster.connection);
+                    exchangeSource.addRemoteSink(
+                        remoteSink,
+                        failFast,
+                        () -> pagesFetched.set(true),
+                        queryPragmas.concurrentExchangeClients(),
+                        computeListener.acquireAvoid()
+                    );
                 }
             })
         );

+ 8 - 8
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

@@ -176,14 +176,6 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
                         try (
                             var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get()))
                         ) {
-                            final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
-                            exchangeSource.addRemoteSink(
-                                remoteSink,
-                                configuration.allowPartialResults() == false,
-                                pagesFetched::incrementAndGet,
-                                queryPragmas.concurrentExchangeClients(),
-                                computeListener.acquireAvoid()
-                            );
                             final boolean sameNode = transportService.getLocalNode().getId().equals(connection.getNode().getId());
                             var dataNodeRequest = new DataNodeRequest(
                                 childSessionId,
@@ -207,6 +199,14 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
                                     return r.completionInfo();
                                 }), DataNodeComputeResponse::new, esqlExecutor)
                             );
+                            final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
+                            exchangeSource.addRemoteSink(
+                                remoteSink,
+                                configuration.allowPartialResults() == false,
+                                pagesFetched::incrementAndGet,
+                                queryPragmas.concurrentExchangeClients(),
+                                computeListener.acquireAvoid()
+                            );
                         }
                     })
                 );