Sfoglia il codice sorgente

Try to finish remote sink once (#117592) (#117666)

Currently, we have three clients fetching pages by default, each with 
its own lifecycle. This can result in scenarios where more than one
request is sent to complete the remote sink. While this does not cause
correctness issues, it is inefficient, especially for cross-cluster
requests. This change tracks the status of the remote sink and tries to
send only one finish request per remote sink.
Nhat Nguyen 11 mesi fa
parent
commit
4c937cbea3

+ 28 - 0
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

@@ -42,6 +42,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -292,6 +293,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
         final Executor responseExecutor;
 
         final AtomicLong estimatedPageSizeInBytes = new AtomicLong(0L);
+        final AtomicBoolean finished = new AtomicBoolean(false);
 
         TransportRemoteSink(
             TransportService transportService,
@@ -311,6 +313,32 @@ public final class ExchangeService extends AbstractLifecycleComponent {
 
         @Override
         public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
+            if (allSourcesFinished) {
+                if (finished.compareAndSet(false, true)) {
+                    doFetchPageAsync(true, listener);
+                } else {
+                    // already finished or promised
+                    listener.onResponse(new ExchangeResponse(blockFactory, null, true));
+                }
+            } else {
+                // already finished
+                if (finished.get()) {
+                    listener.onResponse(new ExchangeResponse(blockFactory, null, true));
+                    return;
+                }
+                doFetchPageAsync(false, ActionListener.wrap(r -> {
+                    if (r.finished()) {
+                        finished.set(true);
+                    }
+                    listener.onResponse(r);
+                }, e -> {
+                    finished.set(true);
+                    listener.onFailure(e);
+                }));
+            }
+        }
+
+        private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
             final long reservedBytes = allSourcesFinished ? 0 : estimatedPageSizeInBytes.get();
             if (reservedBytes > 0) {
                 // This doesn't fully protect ESQL from OOM, but reduces the likelihood.

+ 9 - 0
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java

@@ -449,6 +449,15 @@ public class ExchangeServiceTests extends ESTestCase {
         ExchangeService exchange1 = new ExchangeService(Settings.EMPTY, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
         exchange1.registerTransportHandler(node1);
         AbstractSimpleTransportTestCase.connectToNode(node0, node1.getLocalNode());
+        Set<String> finishingRequests = ConcurrentCollections.newConcurrentSet();
+        node1.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
+            final ExchangeRequest exchangeRequest = (ExchangeRequest) request;
+            if (exchangeRequest.sourcesFinished()) {
+                String exchangeId = exchangeRequest.exchangeId();
+                assertTrue("tried to finish [" + exchangeId + "] twice", finishingRequests.add(exchangeId));
+            }
+            handler.messageReceived(request, channel, task);
+        });
 
         try (exchange0; exchange1; node0; node1) {
             String exchangeId = "exchange";