|
@@ -38,6 +38,7 @@ import org.elasticsearch.transport.Transports;
|
|
|
import java.io.IOException;
|
|
|
import java.util.Map;
|
|
|
import java.util.concurrent.Executor;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
/**
|
|
|
* {@link ExchangeService} is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
|
|
@@ -237,17 +238,40 @@ public final class ExchangeService extends AbstractLifecycleComponent {
|
|
|
return new TransportRemoteSink(transportService, blockFactory, conn, parentTask, exchangeId, executor);
|
|
|
}
|
|
|
|
|
|
- record TransportRemoteSink(
|
|
|
- TransportService transportService,
|
|
|
- BlockFactory blockFactory,
|
|
|
- Transport.Connection connection,
|
|
|
- Task parentTask,
|
|
|
- String exchangeId,
|
|
|
- Executor responseExecutor
|
|
|
- ) implements RemoteSink {
|
|
|
+ static final class TransportRemoteSink implements RemoteSink {
|
|
|
+ final TransportService transportService;
|
|
|
+ final BlockFactory blockFactory;
|
|
|
+ final Transport.Connection connection;
|
|
|
+ final Task parentTask;
|
|
|
+ final String exchangeId;
|
|
|
+ final Executor responseExecutor;
|
|
|
+
|
|
|
+ final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
|
|
|
+
|
|
|
+ TransportRemoteSink(
|
|
|
+ TransportService transportService,
|
|
|
+ BlockFactory blockFactory,
|
|
|
+ Transport.Connection connection,
|
|
|
+ Task parentTask,
|
|
|
+ String exchangeId,
|
|
|
+ Executor responseExecutor
|
|
|
+ ) {
|
|
|
+ this.transportService = transportService;
|
|
|
+ this.blockFactory = blockFactory;
|
|
|
+ this.connection = connection;
|
|
|
+ this.parentTask = parentTask;
|
|
|
+ this.exchangeId = exchangeId;
|
|
|
+ this.responseExecutor = responseExecutor;
|
|
|
+ }
|
|
|
|
|
|
@Override
|
|
|
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
|
|
|
+ final long reservedBytes = estimatedPageSizeInBytes.get();
|
|
|
+ if (reservedBytes > 0) {
|
|
|
+ // This doesn't fully protect ESQL from OOM, but reduces the likelihood.
|
|
|
+ blockFactory.breaker().addEstimateBytesAndMaybeBreak(reservedBytes, "fetch page");
|
|
|
+ listener = ActionListener.runAfter(listener, () -> blockFactory.breaker().addWithoutBreaking(-reservedBytes));
|
|
|
+ }
|
|
|
transportService.sendChildRequest(
|
|
|
connection,
|
|
|
EXCHANGE_ACTION_NAME,
|
|
@@ -256,7 +280,10 @@ public final class ExchangeService extends AbstractLifecycleComponent {
|
|
|
TransportRequestOptions.EMPTY,
|
|
|
new ActionListenerResponseHandler<>(listener, in -> {
|
|
|
try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) {
|
|
|
- return new ExchangeResponse(bsi);
|
|
|
+ final ExchangeResponse resp = new ExchangeResponse(bsi);
|
|
|
+ final long responseBytes = resp.ramBytesUsedByPage();
|
|
|
+ estimatedPageSizeInBytes.getAndUpdate(curr -> Math.max(responseBytes, curr / 2));
|
|
|
+ return resp;
|
|
|
}
|
|
|
}, responseExecutor)
|
|
|
);
|