1
0
Эх сурвалжийг харах

Release operation permit on thread-pool rejection (#25930)

At the shard level we use an operation permit to coordinate between regular shard operations and special operations that need exclusive access. In ES versions < 6, the operation requiring exclusive access was invoked during primary relocation, but ES versions >= 6 this exclusive access is also used when a replica learns about a new primary or when a replica is promoted to primary.

These special operations requiring exclusive access delay regular operations from running, by adding them to a queue, and after finishing the exclusive access, release these operations which then need to be put back on the original thread-pool they were running on. In the presence of thread pool rejections, the current implementation had two issues:

- it would not properly release the operation permit when hitting a rejection (i.e. when calling ThreadedActionListener.onResponse from IndexShardOperationPermits.acquire).
- it would not invoke the onFailure method of the action listener when the shard was closed, and just log a warning instead (see ThreadedActionListener.onFailure), which would ultimately lead to the replication task never being cleaned up (see #25863).

This commit fixes both issues by introducing a custom threaded action listener that is permit-aware and properly deals with rejections.

Closes #25863
Yannick Welsch 8 жил өмнө
parent
commit
620536f850

+ 60 - 5
core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

@@ -20,10 +20,10 @@
 package org.elasticsearch.index.shard;
 
 import org.apache.logging.log4j.Logger;
+import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Assertions;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
-import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -198,11 +198,14 @@ final class IndexShardOperationPermits implements Closeable {
     /**
      * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
      * {@link ActionListener} will be called on the calling thread. During calls of
-     * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener}
-     * will then be called using the provided executor once operations are no longer blocked.
+     * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed.
+     * The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no
+     * longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run
+     * directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure
+     * that the {@link ActionListener#onFailure(Exception)} method provided here only contains lightweight operations.
      *
      * @param onAcquired      {@link ActionListener} that is invoked once acquisition is successful or failed
-     * @param executorOnDelay executor to use for delayed call
+     * @param executorOnDelay executor to use for the possibly delayed {@link ActionListener#onResponse(Object)} call
      * @param forceExecution  whether the runnable should force its execution in case it gets rejected
      */
     public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
@@ -217,7 +220,7 @@ final class IndexShardOperationPermits implements Closeable {
                     final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                     if (executorOnDelay != null) {
                         delayedOperations.add(
-                                new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
+                                new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
                                         new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
                     } else {
                         delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
@@ -269,4 +272,56 @@ final class IndexShardOperationPermits implements Closeable {
         }
     }
 
+    /**
+     * A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
+     * Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
+     * invoker's thread to communicate failures.
+     */
+    private static class PermitAwareThreadedActionListener implements ActionListener<Releasable> {
+
+        private final ThreadPool threadPool;
+        private final String executor;
+        private final ActionListener<Releasable> listener;
+        private final boolean forceExecution;
+
+        private PermitAwareThreadedActionListener(ThreadPool threadPool, String executor, ActionListener<Releasable> listener,
+                                                  boolean forceExecution) {
+            this.threadPool = threadPool;
+            this.executor = executor;
+            this.listener = listener;
+            this.forceExecution = forceExecution;
+        }
+
+        @Override
+        public void onResponse(final Releasable releasable) {
+            threadPool.executor(executor).execute(new AbstractRunnable() {
+                @Override
+                public boolean isForceExecution() {
+                    return forceExecution;
+                }
+
+                @Override
+                protected void doRun() throws Exception {
+                    listener.onResponse(releasable);
+                }
+
+                @Override
+                public void onRejection(Exception e) {
+                    IOUtils.closeWhileHandlingException(releasable);
+                    super.onRejection(e);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    listener.onFailure(e); // will possibly execute on the caller thread
+                }
+            });
+        }
+
+        @Override
+        public void onFailure(final Exception e) {
+            listener.onFailure(e);
+        }
+    }
+
 }

+ 49 - 5
core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

@@ -21,6 +21,9 @@ package org.elasticsearch.index.shard;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
+import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.TestThreadPool;
@@ -47,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.instanceOf;
@@ -59,7 +63,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
 
     @BeforeClass
     public static void setupThreadPool() {
-        threadPool = new TestThreadPool("IndexShardOperationsLockTests");
+        int bulkThreadPoolSize = randomIntBetween(1, 2);
+        int bulkThreadPoolQueueSize = randomIntBetween(1, 2);
+        threadPool = new TestThreadPool("IndexShardOperationsLockTests",
+            Settings.builder()
+                .put("thread_pool." + ThreadPool.Names.BULK + ".size", bulkThreadPoolSize)
+                .put("thread_pool." + ThreadPool.Names.BULK + ".queue_size", bulkThreadPoolQueueSize)
+                .build());
+        assertThat(threadPool.executor(ThreadPool.Names.BULK), instanceOf(EsThreadPoolExecutor.class));
+        assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getCorePoolSize(), equalTo(bulkThreadPoolSize));
+        assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getMaximumPoolSize(), equalTo(bulkThreadPoolSize));
+        assertThat(((EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.BULK)).getQueue().remainingCapacity(),
+            equalTo(bulkThreadPoolQueueSize));
     }
 
     @AfterClass
@@ -82,33 +97,53 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
     public void testAllOperationsInvoked() throws InterruptedException, TimeoutException, ExecutionException {
         int numThreads = 10;
 
+        class DummyException extends RuntimeException {}
+
         List<PlainActionFuture<Releasable>> futures = new ArrayList<>();
         List<Thread> operationThreads = new ArrayList<>();
-        CountDownLatch latch = new CountDownLatch(numThreads / 2);
+        CountDownLatch latch = new CountDownLatch(numThreads / 4);
+        boolean forceExecution = randomBoolean();
         for (int i = 0; i < numThreads; i++) {
+            // the bulk thread pool uses a bounded size and can get rejections, see setupThreadPool
+            String threadPoolName = randomFrom(ThreadPool.Names.BULK, ThreadPool.Names.GENERIC);
+            boolean failingListener = randomBoolean();
             PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
                 @Override
                 public void onResponse(Releasable releasable) {
                     releasable.close();
-                    super.onResponse(releasable);
+                    if (failingListener) {
+                        throw new DummyException();
+                    } else {
+                        super.onResponse(releasable);
+                    }
                 }
             };
             Thread thread = new Thread() {
                 public void run() {
                     latch.countDown();
-                    permits.acquire(future, ThreadPool.Names.GENERIC, true);
+                    try {
+                        permits.acquire(future, threadPoolName, forceExecution);
+                    } catch (DummyException dummyException) {
+                        // ok, notify future
+                        assertTrue(failingListener);
+                        future.onFailure(dummyException);
+                    }
                 }
             };
             futures.add(future);
             operationThreads.add(thread);
         }
 
+        boolean closeAfterBlocking = randomBoolean();
         CountDownLatch blockFinished = new CountDownLatch(1);
         threadPool.generic().execute(() -> {
             try {
                 latch.await();
                 blockAndWait().close();
                 blockFinished.countDown();
+                if (closeAfterBlocking) {
+                    permits.close();
+                }
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
@@ -119,7 +154,16 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
         }
 
         for (PlainActionFuture<Releasable> future : futures) {
-            assertNotNull(future.get(1, TimeUnit.MINUTES));
+            try {
+                assertNotNull(future.get(1, TimeUnit.MINUTES));
+            } catch (ExecutionException e) {
+                if (closeAfterBlocking) {
+                    assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class))
+                        .or(instanceOf(IndexShardClosedException.class)));
+                } else {
+                    assertThat(e.getCause(), either(instanceOf(DummyException.class)).or(instanceOf(EsRejectedExecutionException.class)));
+                }
+            }
         }
 
         for (Thread thread : operationThreads) {