Browse Source

Force execute inactive sink reaper (#109632)

Like the SearchService#Reaper, the InactiveSinksReaper in ESQL should 
use force execution.

Relates #106544
Nhat Nguyen 1 year ago
parent
commit
c61bdd8121

+ 5 - 0
docs/changelog/109632.yaml

@@ -0,0 +1,5 @@
+pr: 109632
+summary: Force execute inactive sink reaper
+area: ES|QL
+type: bug
+issues: []

+ 40 - 17
x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

@@ -15,12 +15,12 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockStreamInput;
 import org.elasticsearch.core.Nullable;
@@ -60,6 +60,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
      * removed from the exchange service if no sinks are attached (i.e., no computation uses that sink handler).
      */
     public static final String INACTIVE_SINKS_INTERVAL_SETTING = "esql.exchange.sink_inactive_interval";
+    public static final TimeValue INACTIVE_SINKS_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5);
 
     private static final Logger LOGGER = LogManager.getLogger(ExchangeService.class);
 
@@ -69,14 +70,17 @@ public final class ExchangeService extends AbstractLifecycleComponent {
 
     private final Map<String, ExchangeSinkHandler> sinks = ConcurrentCollections.newConcurrentMap();
 
-    private final InactiveSinksReaper inactiveSinksReaper;
-
     public ExchangeService(Settings settings, ThreadPool threadPool, String executorName, BlockFactory blockFactory) {
         this.threadPool = threadPool;
         this.executor = threadPool.executor(executorName);
         this.blockFactory = blockFactory;
-        final var inactiveInterval = settings.getAsTime(INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMinutes(5));
-        this.inactiveSinksReaper = new InactiveSinksReaper(LOGGER, threadPool, this.executor, inactiveInterval);
+        final var inactiveInterval = settings.getAsTime(INACTIVE_SINKS_INTERVAL_SETTING, INACTIVE_SINKS_INTERVAL_DEFAULT);
+        // Run the reaper every half of the keep_alive interval
+        this.threadPool.scheduleWithFixedDelay(
+            new InactiveSinksReaper(LOGGER, threadPool, inactiveInterval),
+            TimeValue.timeValueMillis(Math.max(1, inactiveInterval.millis() / 2)),
+            executor
+        );
     }
 
     public void registerTransportHandler(TransportService transportService) {
@@ -206,23 +210,42 @@ public final class ExchangeService extends AbstractLifecycleComponent {
         }
     }
 
-    private final class InactiveSinksReaper extends AbstractAsyncTask {
-        InactiveSinksReaper(Logger logger, ThreadPool threadPool, Executor executor, TimeValue interval) {
-            super(logger, threadPool, executor, interval, true);
-            rescheduleIfNecessary();
+    private final class InactiveSinksReaper extends AbstractRunnable {
+        private final Logger logger;
+        private final TimeValue keepAlive;
+        private final ThreadPool threadPool;
+
+        InactiveSinksReaper(Logger logger, ThreadPool threadPool, TimeValue keepAlive) {
+            this.logger = logger;
+            this.keepAlive = keepAlive;
+            this.threadPool = threadPool;
+        }
+
+        @Override
+        public void onFailure(Exception e) {
+            logger.error("unexpected error when closing inactive sinks", e);
+            assert false : e;
+        }
+
+        @Override
+        public void onRejection(Exception e) {
+            if (e instanceof EsRejectedExecutionException esre && esre.isExecutorShutdown()) {
+                logger.debug("rejected execution when closing inactive sinks");
+            } else {
+                onFailure(e);
+            }
         }
 
         @Override
-        protected boolean mustReschedule() {
-            Lifecycle.State state = lifecycleState();
-            return state != Lifecycle.State.STOPPED && state != Lifecycle.State.CLOSED;
+        public boolean isForceExecution() {
+            // mustn't reject this task even if the queue is full
+            return true;
         }
 
         @Override
-        protected void runInternal() {
+        protected void doRun() {
             assert Transports.assertNotTransportThread("reaping inactive exchanges can be expensive");
             assert ThreadPool.assertNotScheduleThread("reaping inactive exchanges can be expensive");
-            final TimeValue maxInterval = getInterval();
             final long nowInMillis = threadPool.relativeTimeInMillis();
             for (Map.Entry<String, ExchangeSinkHandler> e : sinks.entrySet()) {
                 ExchangeSinkHandler sink = e.getValue();
@@ -230,7 +253,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
                     continue;
                 }
                 long elapsed = nowInMillis - sink.lastUpdatedTimeInMillis();
-                if (elapsed > maxInterval.millis()) {
+                if (elapsed > keepAlive.millis()) {
                     finishSinkHandler(
                         e.getKey(),
                         new ElasticsearchTimeoutException(
@@ -320,7 +343,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
 
     @Override
     protected void doStop() {
-        inactiveSinksReaper.close();
+
     }
 
     @Override