|
@@ -31,6 +31,7 @@ import org.elasticsearch.compute.operator.DriverProfile;
|
|
|
import org.elasticsearch.compute.operator.DriverTaskRunner;
|
|
|
import org.elasticsearch.compute.operator.ResponseHeadersCollector;
|
|
|
import org.elasticsearch.compute.operator.exchange.ExchangeService;
|
|
|
+import org.elasticsearch.compute.operator.exchange.ExchangeSink;
|
|
|
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
|
|
|
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
|
|
|
import org.elasticsearch.core.IOUtils;
|
|
@@ -369,7 +370,7 @@ public class ComputeService {
|
|
|
}
|
|
|
|
|
|
void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<List<DriverProfile>> listener) {
|
|
|
- listener = ActionListener.runAfter(listener, () -> Releasables.close(context.searchContexts));
|
|
|
+ listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts));
|
|
|
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts.size());
|
|
|
for (int i = 0; i < context.searchContexts.size(); i++) {
|
|
|
SearchContext searchContext = context.searchContexts.get(i);
|
|
@@ -457,6 +458,8 @@ public class ComputeService {
|
|
|
aliasFilter,
|
|
|
clusterAlias
|
|
|
);
|
|
|
+ // TODO: `searchService.createSearchContext` allows opening search contexts without limits,
|
|
|
+ // we need to limit the number of active search contexts here or in SearchService
|
|
|
SearchContext context = searchService.createSearchContext(shardRequest, SearchService.NO_TIMEOUT);
|
|
|
searchContexts.add(context);
|
|
|
}
|
|
@@ -576,46 +579,94 @@ public class ComputeService {
|
|
|
// TODO: Use an internal action here
|
|
|
public static final String DATA_ACTION_NAME = EsqlQueryAction.NAME + "/data";
|
|
|
|
|
|
- private class DataNodeRequestHandler implements TransportRequestHandler<DataNodeRequest> {
|
|
|
- @Override
|
|
|
- public void messageReceived(DataNodeRequest request, TransportChannel channel, Task task) {
|
|
|
- final var parentTask = (CancellableTask) task;
|
|
|
- final var sessionId = request.sessionId();
|
|
|
- final var exchangeSink = exchangeService.getSinkHandler(sessionId);
|
|
|
+ private class DataNodeRequestExecutor {
|
|
|
+ private final DataNodeRequest request;
|
|
|
+ private final CancellableTask parentTask;
|
|
|
+ private final ExchangeSinkHandler exchangeSink;
|
|
|
+ private final ActionListener<ComputeResponse> listener;
|
|
|
+ private final List<DriverProfile> driverProfiles;
|
|
|
+ private final int maxConcurrentShards;
|
|
|
+ private final ExchangeSink blockingSink; // block until we have completed on all shards or the coordinator has enough data
|
|
|
+
|
|
|
+ DataNodeRequestExecutor(
|
|
|
+ DataNodeRequest request,
|
|
|
+ CancellableTask parentTask,
|
|
|
+ ExchangeSinkHandler exchangeSink,
|
|
|
+ int maxConcurrentShards,
|
|
|
+ ActionListener<ComputeResponse> listener
|
|
|
+ ) {
|
|
|
+ this.request = request;
|
|
|
+ this.parentTask = parentTask;
|
|
|
+ this.exchangeSink = exchangeSink;
|
|
|
+ this.listener = listener;
|
|
|
+ this.driverProfiles = request.configuration().profile() ? Collections.synchronizedList(new ArrayList<>()) : List.of();
|
|
|
+ this.maxConcurrentShards = maxConcurrentShards;
|
|
|
+ this.blockingSink = exchangeSink.createExchangeSink();
|
|
|
+ }
|
|
|
+
|
|
|
+ void start() {
|
|
|
parentTask.addListener(
|
|
|
- () -> exchangeService.finishSinkHandler(sessionId, new TaskCancelledException(parentTask.getReasonCancelled()))
|
|
|
+ () -> exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(parentTask.getReasonCancelled()))
|
|
|
);
|
|
|
- final ActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
|
|
|
+ runBatch(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void runBatch(int startBatchIndex) {
|
|
|
final EsqlConfiguration configuration = request.configuration();
|
|
|
- String clusterAlias = request.clusterAlias();
|
|
|
- acquireSearchContexts(
|
|
|
- clusterAlias,
|
|
|
- request.shardIds(),
|
|
|
- configuration,
|
|
|
- request.aliasFilters(),
|
|
|
- ActionListener.wrap(searchContexts -> {
|
|
|
- assert ThreadPool.assertCurrentThreadPool(ESQL_THREAD_POOL_NAME);
|
|
|
- var computeContext = new ComputeContext(sessionId, clusterAlias, searchContexts, configuration, null, exchangeSink);
|
|
|
- runCompute(parentTask, computeContext, request.plan(), ActionListener.wrap(driverProfiles -> {
|
|
|
- // don't return until all pages are fetched
|
|
|
- exchangeSink.addCompletionListener(
|
|
|
- ContextPreservingActionListener.wrapPreservingContext(
|
|
|
- ActionListener.releaseAfter(
|
|
|
- listener.map(nullValue -> new ComputeResponse(driverProfiles)),
|
|
|
- () -> exchangeService.finishSinkHandler(sessionId, null)
|
|
|
- ),
|
|
|
- transportService.getThreadPool().getThreadContext()
|
|
|
- )
|
|
|
- );
|
|
|
- }, e -> {
|
|
|
- exchangeService.finishSinkHandler(sessionId, e);
|
|
|
- listener.onFailure(e);
|
|
|
- }));
|
|
|
- }, e -> {
|
|
|
- exchangeService.finishSinkHandler(sessionId, e);
|
|
|
- listener.onFailure(e);
|
|
|
- })
|
|
|
+ final String clusterAlias = request.clusterAlias();
|
|
|
+ final var sessionId = request.sessionId();
|
|
|
+ final int endBatchIndex = Math.min(startBatchIndex + maxConcurrentShards, request.shardIds().size());
|
|
|
+ List<ShardId> shardIds = request.shardIds().subList(startBatchIndex, endBatchIndex);
|
|
|
+ acquireSearchContexts(clusterAlias, shardIds, configuration, request.aliasFilters(), ActionListener.wrap(searchContexts -> {
|
|
|
+ assert ThreadPool.assertCurrentThreadPool(ESQL_THREAD_POOL_NAME, ESQL_WORKER_THREAD_POOL_NAME);
|
|
|
+ var computeContext = new ComputeContext(sessionId, clusterAlias, searchContexts, configuration, null, exchangeSink);
|
|
|
+ runCompute(
|
|
|
+ parentTask,
|
|
|
+ computeContext,
|
|
|
+ request.plan(),
|
|
|
+ ActionListener.wrap(profiles -> onBatchCompleted(endBatchIndex, profiles), this::onFailure)
|
|
|
+ );
|
|
|
+ }, this::onFailure));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void onBatchCompleted(int lastBatchIndex, List<DriverProfile> batchProfiles) {
|
|
|
+ if (request.configuration().profile()) {
|
|
|
+ driverProfiles.addAll(batchProfiles);
|
|
|
+ }
|
|
|
+ if (lastBatchIndex < request.shardIds().size() && exchangeSink.isFinished() == false) {
|
|
|
+ runBatch(lastBatchIndex);
|
|
|
+ } else {
|
|
|
+ blockingSink.finish();
|
|
|
+ // don't return until all pages are fetched
|
|
|
+ exchangeSink.addCompletionListener(
|
|
|
+ ContextPreservingActionListener.wrapPreservingContext(
|
|
|
+ ActionListener.runBefore(
|
|
|
+ listener.map(nullValue -> new ComputeResponse(driverProfiles)),
|
|
|
+ () -> exchangeService.finishSinkHandler(request.sessionId(), null)
|
|
|
+ ),
|
|
|
+ transportService.getThreadPool().getThreadContext()
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void onFailure(Exception e) {
|
|
|
+ exchangeService.finishSinkHandler(request.sessionId(), e);
|
|
|
+ listener.onFailure(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class DataNodeRequestHandler implements TransportRequestHandler<DataNodeRequest> {
|
|
|
+ @Override
|
|
|
+ public void messageReceived(DataNodeRequest request, TransportChannel channel, Task task) {
|
|
|
+ DataNodeRequestExecutor executor = new DataNodeRequestExecutor(
|
|
|
+ request,
|
|
|
+ (CancellableTask) task,
|
|
|
+ exchangeService.getSinkHandler(request.sessionId()),
|
|
|
+ request.configuration().pragmas().maxConcurrentShardsPerNode(),
|
|
|
+ new ChannelActionListener<>(channel)
|
|
|
);
|
|
|
+ executor.start();
|
|
|
}
|
|
|
}
|
|
|
|