Browse Source

Remove Unused IndexShardOperationPermits#blockOperations (#65376)

Follow up to #64098, this method was only used in tests now so this PR removes it.
Armin Braun 4 years ago
parent
commit
778e7d8b65

+ 2 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -673,7 +673,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             throws IllegalIndexShardStateException, IllegalStateException {
         assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
         try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
-            indexShardOperationPermits.asyncBlockOperations(new ActionListener<>() {
+            indexShardOperationPermits.blockOperations(new ActionListener<>() {
                 @Override
                 public void onResponse(Releasable releasable) {
                     boolean success = false;
@@ -2814,7 +2814,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             onPermitAcquired.onFailure(e);
         });
         try {
-            indexShardOperationPermits.asyncBlockOperations(wrappedListener, timeout, timeUnit, ThreadPool.Names.GENERIC);
+            indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, ThreadPool.Names.GENERIC);
         } catch (Exception e) {
             forceRefreshes.close();
             throw e;

+ 7 - 33
server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java

@@ -24,7 +24,6 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
-import org.elasticsearch.common.CheckedRunnable;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -89,42 +88,18 @@ final class IndexShardOperationPermits implements Closeable {
     }
 
     /**
-     * Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues
-     * operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed.
-     *
-     * @param timeout   the maximum time to wait for the in-flight operations block
-     * @param timeUnit  the time unit of the {@code timeout} argument
-     * @param onBlocked the action to run once the block has been acquired
-     * @param <E>       the type of checked exception thrown by {@code onBlocked}
-     * @throws InterruptedException      if calling thread is interrupted
-     * @throws TimeoutException          if timed out waiting for in-flight operations to finish
-     * @throws IndexShardClosedException if operation permit has been closed
-     */
-    <E extends Exception> void blockOperations(
-            final long timeout,
-            final TimeUnit timeUnit,
-            final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
-        delayOperations();
-        try (Releasable ignored = acquireAll(timeout, timeUnit)) {
-            onBlocked.run();
-        } finally {
-            releaseDelayedOperations();
-        }
-    }
-
-    /**
-     * Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all
-     * permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed
-     * operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in this case the
-     * {@code onFailure} handler will be invoked after delayed operations are released.
+     * Immediately delays operations and uses the {@code executor} to wait for in-flight operations to finish and then acquires all
+     * permits. When all permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are
+     * started. Delayed operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in
+     * this case the {@code onFailure} handler will be invoked after delayed operations are released.
      *
      * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
      * @param timeout    the maximum time to wait for the in-flight operations block
      * @param timeUnit   the time unit of the {@code timeout} argument
      * @param executor   executor on which to wait for in-flight operations to finish and acquire all permits
      */
-    public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, final long timeout, final TimeUnit timeUnit,
-                                     String executor)  {
+    public void blockOperations(final ActionListener<Releasable> onAcquired, final long timeout, final TimeUnit timeUnit,
+                                String executor)  {
         delayOperations();
         threadPool.executor(executor).execute(new AbstractRunnable() {
 
@@ -213,8 +188,7 @@ final class IndexShardOperationPermits implements Closeable {
 
     /**
      * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
-     * {@link ActionListener} will be called on the calling thread. During calls of
-     * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed.
+     * {@link ActionListener} will be called on the calling thread.
      * The {@link ActionListener#onResponse(Object)} method will then be called using the provided executor once operations are no
      * longer blocked. Note that the executor will not be used for {@link ActionListener#onFailure(Exception)} calls. Those will run
      * directly on the calling thread, which in case of delays, will be a generic thread. Callers should thus make sure

+ 40 - 50
server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java

@@ -198,10 +198,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
 
     public void testBlockIfClosed() {
         permits.close();
-        expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
-            () -> { throw new IllegalArgumentException("fake error"); }));
         expectThrows(IndexShardClosedException.class,
-            () -> permits.asyncBlockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}),
+            () -> permits.blockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}),
                 randomInt(10), TimeUnit.MINUTES, ThreadPool.Names.GENERIC));
     }
 
@@ -222,7 +220,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
         try (Releasable ignored = blockAndWait()) {
             permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
 
-            permits.asyncBlockOperations(wrap(() -> {
+            permits.blockOperations(wrap(() -> {
                 blocked.set(true);
                 blockAcquired.countDown();
                 releaseBlock.await();
@@ -294,27 +292,27 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
         CountDownLatch blockReleased = new CountDownLatch(1);
         boolean throwsException = randomBoolean();
         IndexShardClosedException exception = new IndexShardClosedException(new ShardId("blubb", "id", 0));
-        threadPool.generic().execute(() -> {
-                try {
-                    permits.blockOperations(1, TimeUnit.MINUTES, () -> {
-                        try {
-                            blockAcquired.countDown();
-                            releaseBlock.await();
-                            if (throwsException) {
-                                throw exception;
-                            }
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException();
-                        }
-                    });
-                } catch (Exception e) {
-                    if (e != exception) {
-                        throw new RuntimeException(e);
+        permits.blockOperations(ActionListener.runAfter(new ActionListener<>() {
+            @Override
+            public void onResponse(Releasable releasable) {
+                try (releasable) {
+                    blockAcquired.countDown();
+                    releaseBlock.await();
+                    if (throwsException) {
+                        onFailure(exception);
                     }
-                } finally {
-                    blockReleased.countDown();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException();
                 }
-            });
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                if (e != exception) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, blockReleased::countDown), 1, TimeUnit.MINUTES, ThreadPool.Names.GENERIC);
         blockAcquired.await();
         return () -> {
             releaseBlock.countDown();
@@ -330,7 +328,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
         final CountDownLatch blockAcquired = new CountDownLatch(1);
         final CountDownLatch releaseBlock = new CountDownLatch(1);
         final AtomicBoolean blocked = new AtomicBoolean();
-        permits.asyncBlockOperations(wrap(() -> {
+        permits.blockOperations(wrap(() -> {
                 blocked.set(true);
                 blockAcquired.countDown();
                 releaseBlock.await();
@@ -382,7 +380,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
         // now we will delay operations while the first operation is still executing (because it is latched)
         final CountDownLatch blockedLatch = new CountDownLatch(1);
         final AtomicBoolean onBlocked = new AtomicBoolean();
-        permits.asyncBlockOperations(wrap(() -> {
+        permits.blockOperations(wrap(() -> {
             onBlocked.set(true);
             blockedLatch.countDown();
         }), 30, TimeUnit.MINUTES, ThreadPool.Names.GENERIC);
@@ -470,7 +468,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
             } catch (final BrokenBarrierException | InterruptedException e) {
                 throw new RuntimeException(e);
             }
-            permits.asyncBlockOperations(wrap(() -> {
+            permits.blockOperations(wrap(() -> {
                 values.add(operations);
                 operationLatch.countDown();
             }), 30, TimeUnit.MINUTES, ThreadPool.Names.GENERIC);
@@ -539,7 +537,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
     public void testAsyncBlockOperationsOnFailure() throws InterruptedException {
         final AtomicReference<Exception> reference = new AtomicReference<>();
         final CountDownLatch onFailureLatch = new CountDownLatch(1);
-        permits.asyncBlockOperations(new ActionListener<Releasable>() {
+        permits.blockOperations(new ActionListener<Releasable>() {
             @Override
             public void onResponse(Releasable releasable) {
                 try (Releasable ignored = releasable) {
@@ -571,30 +569,22 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
 
         operationExecutingLatch.await();
 
-        {
-            final TimeoutException e =
-                    expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {}));
-            assertThat(e, hasToString(containsString("timeout while blocking operations")));
-        }
-
-        {
-            final AtomicReference<Exception> reference = new AtomicReference<>();
-            final CountDownLatch onFailureLatch = new CountDownLatch(1);
-            permits.asyncBlockOperations(new ActionListener<Releasable>() {
-                @Override
-                public void onResponse(Releasable releasable) {
-                    releasable.close();
-                }
+        final AtomicReference<Exception> reference = new AtomicReference<>();
+        final CountDownLatch onFailureLatch = new CountDownLatch(1);
+        permits.blockOperations(new ActionListener<Releasable>() {
+            @Override
+            public void onResponse(Releasable releasable) {
+                releasable.close();
+            }
 
-                @Override
-                public void onFailure(final Exception e) {
-                    reference.set(e);
-                    onFailureLatch.countDown();
-                }
-            }, 1, TimeUnit.MILLISECONDS, ThreadPool.Names.GENERIC);
-            onFailureLatch.await();
-            assertThat(reference.get(), hasToString(containsString("timeout while blocking operations")));
-        }
+            @Override
+            public void onFailure(final Exception e) {
+                reference.set(e);
+                onFailureLatch.countDown();
+            }
+        }, 1, TimeUnit.MILLISECONDS, ThreadPool.Names.GENERIC);
+        onFailureLatch.await();
+        assertThat(reference.get(), hasToString(containsString("timeout while blocking operations")));
 
         operationLatch.countDown();