Browse Source

Introduce PlainActionFuture#result (#94474)

It's sometimes useful to retrieve the result of a future that we know to
be complete, so that no blocking is required. This commit introduces a
`result()` method to do this without needing to specify a zero timeout,
and an `actionResult()` method that acts like `result()` except that it
unwraps exceptions like `actionGet()` and friends.
David Turner 2 years ago
parent
commit
8ad8694116
18 changed files with 213 additions and 102 deletions
  1. 7 12
      server/src/main/java/org/elasticsearch/action/StepListener.java
  2. 2 3
      server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java
  3. 52 2
      server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java
  4. 4 3
      server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java
  5. 2 2
      server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java
  6. 2 3
      server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java
  7. 1 1
      server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java
  8. 2 13
      server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java
  9. 1 1
      server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java
  10. 3 5
      server/src/test/java/org/elasticsearch/action/StepListenerTests.java
  11. 108 16
      server/src/test/java/org/elasticsearch/action/support/PlainActionFutureTests.java
  12. 15 15
      server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java
  13. 2 2
      server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java
  14. 1 1
      server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java
  15. 1 1
      server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java
  16. 2 2
      x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ProgressListenableActionFuture.java
  17. 1 3
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java
  18. 7 17
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/SecondaryAuthenticatorTests.java

+ 7 - 12
server/src/main/java/org/elasticsearch/action/StepListener.java

@@ -8,12 +8,9 @@
 
 package org.elasticsearch.action;
 
-import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.CheckedConsumer;
 
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
@@ -92,20 +89,18 @@ public final class StepListener<Response> implements ActionListener<Response> {
     }
 
     /**
-     * Returns the future associated with the given step listener
+     * @return the result of this step, if it has been completed successfully, or throw the exception with which it was completed
+     * exceptionally. It is not valid to call this method if the step is incomplete.
      */
-    public Future<Response> asFuture() {
-        return delegate;
+    public Response result() {
+        return delegate.result();
     }
 
     /**
-     * Gets the result of this step. This method will throw {@link IllegalStateException} if this step is not completed yet.
+     * @return whether this step is complete yet.
      */
-    public Response result() {
-        if (delegate.isDone() == false) {
-            throw new IllegalStateException("step is not completed yet");
-        }
-        return FutureUtils.get(delegate, 0L, TimeUnit.NANOSECONDS); // this future is done already - use a non-blocking method.
+    public boolean isDone() {
+        return delegate.isDone();
     }
 
     /**

+ 2 - 3
server/src/main/java/org/elasticsearch/action/support/ListenableActionFuture.java

@@ -93,9 +93,8 @@ public class ListenableActionFuture<T> extends PlainActionFuture<T> {
     }
 
     private void executeListener(final ActionListener<T> listener) {
-        // we use a timeout of 0 to by pass assertion forbidding to call actionGet() (blocking) on a network thread.
-        // here we know we will never block
-        ActionListener.completeWith(listener, () -> actionGet(0));
+        // call non-blocking actionResult() as we could be on a network or scheduler thread which we must not block
+        ActionListener.completeWith(listener, this::actionResult);
     }
 
 }

+ 52 - 2
server/src/main/java/org/elasticsearch/action/support/PlainActionFuture.java

@@ -222,6 +222,36 @@ public class PlainActionFuture<T> implements ActionFuture<T>, ActionListener<T>
         }
     }
 
+    /**
+     * Return the result of this future, similarly to {@link FutureUtils#get} with a zero timeout except that this method ignores the
+     * interrupted status of the calling thread.
+     * <p>
+     * As with {@link FutureUtils#get}, if the future completed exceptionally with a {@link RuntimeException} then this method throws that
+     * exception, but if the future completed exceptionally with an exception that is not a {@link RuntimeException} then this method throws
+     * an {@link UncategorizedExecutionException} whose cause is an {@link ExecutionException} whose cause is the completing exception.
+     * <p>
+     * It is not valid to call this method if the future is incomplete.
+     *
+     * @return the result of this future, if it has been completed successfully.
+     * @throws RuntimeException if this future was completed exceptionally, wrapping checked exceptions as described above.
+     * @throws CancellationException if this future was cancelled.
+     */
+    public T result() {
+        return sync.result();
+    }
+
+    /**
+     * Return the result of this future, if it has been completed successfully, or unwrap and throw the exception with which it was
+     * completed exceptionally. It is not valid to call this method if the future is incomplete.
+     */
+    public T actionResult() {
+        try {
+            return result();
+        } catch (ElasticsearchException e) {
+            throw unwrapEsException(e);
+        }
+    }
+
     /**
      * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
      * private subclass to hold the synchronizer.  This synchronizer is used to
@@ -320,6 +350,26 @@ public class PlainActionFuture<T> implements ActionFuture<T>, ActionListener<T>
             }
         }
 
+        V result() {
+            final int state = getState();
+            switch (state) {
+                case COMPLETED:
+                    if (exception instanceof RuntimeException runtimeException) {
+                        throw runtimeException;
+                    } else if (exception != null) {
+                        throw new UncategorizedExecutionException("Failed execution", new ExecutionException(exception));
+                    } else {
+                        return value;
+                    }
+                case CANCELLED:
+                    throw new CancellationException("Task was cancelled.");
+                default:
+                    final var message = "Error, synchronizer in invalid state: " + state;
+                    assert false : message;
+                    throw new IllegalStateException(message);
+            }
+        }
+
         /**
          * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
          */
@@ -385,8 +435,8 @@ public class PlainActionFuture<T> implements ActionFuture<T>, ActionListener<T>
 
     private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
         Throwable root = esEx.unwrapCause();
-        if (root instanceof RuntimeException) {
-            return (RuntimeException) root;
+        if (root instanceof RuntimeException runtimeException) {
+            return runtimeException;
         }
         return new UncategorizedExecutionException("Failed execution", root);
     }

+ 4 - 3
server/src/main/java/org/elasticsearch/common/util/CancellableSingleObjectCache.java

@@ -136,9 +136,10 @@ public abstract class CancellableSingleObjectCache<Input, Key, Value> {
 
                 // Our item was only just released, possibly cancelled, by another get() with a fresher key. We don't simply retry
                 // since that would evict the new item. Instead let's see if it was cancelled or whether it completed properly.
-                if (currentCachedItem.getFuture().isDone()) {
+                final var future = currentCachedItem.getFuture();
+                if (future.isDone()) {
                     try {
-                        listener.onResponse(currentCachedItem.getFuture().actionGet(0L));
+                        listener.onResponse(future.actionResult());
                         return;
                     } catch (TaskCancelledException e) {
                         // previous task was cancelled before completion, therefore we must perform our own one-shot refresh
@@ -225,7 +226,7 @@ public abstract class CancellableSingleObjectCache<Input, Key, Value> {
                 if (future.isDone()) {
                     // No need to bother with ref counting & cancellation any more, just complete the listener.
                     // We know it wasn't cancelled because there are still references.
-                    ActionListener.completeWith(listener, () -> future.actionGet(0L));
+                    ActionListener.completeWith(listener, future::actionResult);
                 } else {
                     // Refresh is still pending; it's not cancelled because there are still references.
                     future.addListener(ContextPreservingActionListener.wrapPreservingContext(listener, threadContext));

+ 2 - 2
server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java

@@ -74,8 +74,8 @@ public class FutureUtils {
     }
 
     public static RuntimeException rethrowExecutionException(ExecutionException e) {
-        if (e.getCause() instanceof RuntimeException) {
-            return (RuntimeException) e.getCause();
+        if (e.getCause()instanceof RuntimeException runtimeException) {
+            return runtimeException;
         } else {
             return new UncategorizedExecutionException("Failed execution", e);
         }

+ 2 - 3
server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java

@@ -18,7 +18,6 @@ import org.elasticsearch.core.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 /**
  * An {@link ActionListener} which allows for the result to fan out to a (dynamic) collection of other listeners, added using {@link
@@ -107,8 +106,8 @@ public final class ListenableFuture<V> extends PlainActionFuture<V> {
 
     private void notifyListener(ActionListener<V> listener) {
         assert done;
-        // call get() in a non-blocking fashion as we could be on a network or scheduler thread which we must not block
-        ActionListener.completeWith(listener, () -> FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS));
+        // call non-blocking result() as we could be on a network or scheduler thread which we must not block
+        ActionListener.completeWith(listener, this::result);
     }
 
     @Override

+ 1 - 1
server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

@@ -256,7 +256,7 @@ final class CompositeIndexEventListener implements IndexEventListener {
                 nextListener.beforeIndexShardRecovery(indexShard, indexSettings, future);
                 if (future.isDone()) {
                     // common case, not actually async, so just check for an exception and continue on the same thread
-                    future.get();
+                    future.result();
                     continue;
                 }
             } catch (Exception e) {

+ 2 - 13
server/src/main/java/org/elasticsearch/indices/TimestampFieldMapperService.java

@@ -10,7 +10,6 @@ package org.elasticsearch.indices;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterStateApplier;
@@ -23,7 +22,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.core.Nullable;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.mapper.DateFieldMapper;
@@ -174,17 +172,8 @@ public class TimestampFieldMapperService extends AbstractLifecycleComponent impl
         if (future == null || future.isDone() == false) {
             return null;
         }
-
-        try {
-            // It's possible that callers of this class are executed
-            // in a transport thread, for that reason we request
-            // the future value with a timeout of 0. That won't
-            // trigger assertion errors.
-            return future.actionGet(TimeValue.ZERO);
-        } catch (ElasticsearchTimeoutException e) {
-            assert false : "Unexpected timeout exception while getting a timestamp mapping";
-            throw e;
-        }
+        // call non-blocking actionResult() as we could be on a network or scheduler thread which we must not block
+        return future.actionResult();
     }
 
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

@@ -343,7 +343,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
                 final Transport.Connection connection = openConnectionStep.result();
                 final DiscoveryNode node = connection.getNode();
                 logger.debug(() -> format("[%s] failed to open managed connection to seed node: [%s]", clusterAlias, node), e);
-                IOUtils.closeWhileHandlingException(openConnectionStep.result());
+                IOUtils.closeWhileHandlingException(connection);
                 onFailure.accept(e);
             });
         } else {

+ 3 - 5
server/src/test/java/org/elasticsearch/action/StepListenerTests.java

@@ -28,7 +28,6 @@ import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.notNullValue;
 
 public class StepListenerTests extends ESTestCase {
     private ThreadPool threadPool;
@@ -92,7 +91,7 @@ public class StepListenerTests extends ESTestCase {
 
         if (failedStep == 1) {
             assertThat(expectThrows(RuntimeException.class, step1::result).getMessage(), equalTo("failed at step 1"));
-            assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(), equalTo("step is not completed yet"));
+            assertFalse(step2.isDone());
         } else {
             assertThat(step1.result(), equalTo("hello"));
             assertThat(expectThrows(RuntimeException.class, step2::result).getMessage(), equalTo("failed at step 2"));
@@ -131,8 +130,7 @@ public class StepListenerTests extends ESTestCase {
         Collections.shuffle(stepListeners, random());
 
         final StepListener<Integer> combined = stepListeners.get(0).thenCombine(stepListeners.get(1), Math::max);
-        assertThat(combined.asFuture(), notNullValue());
-        assertThat(combined.asFuture().isDone(), equalTo(false));
+        assertFalse(combined.isDone());
 
         final List<Integer> results = Collections.synchronizedList(new ArrayList<>(stepListeners.size()));
         final CountDownLatch latch = new CountDownLatch(stepListeners.size());
@@ -157,7 +155,7 @@ public class StepListenerTests extends ESTestCase {
         }
 
         latch.await();
-        assertThat(combined.asFuture().isDone(), equalTo(true));
+        assertTrue(combined.isDone());
         if (failed.get() == false) {
             assertThat(combined.result(), equalTo(results.stream().reduce(Math::max).get()));
         } else {

+ 108 - 16
server/src/test/java/org/elasticsearch/action/support/PlainActionFutureTests.java

@@ -8,13 +8,16 @@
 
 package org.elasticsearch.action.support;
 
+import org.elasticsearch.Assertions;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.RemoteTransportException;
 
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -23,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class PlainActionFutureTests extends ESTestCase {
 
     public void testInterruption() throws Exception {
-        final PlainActionFuture<Object> adapter = new PlainActionFuture<>() {
+        final PlainActionFuture<Object> future = new PlainActionFuture<>() {
             @Override
             public void onResponse(Object value) {
                 throw new AssertionError("should not be called");
@@ -34,11 +37,11 @@ public class PlainActionFutureTests extends ESTestCase {
         final Runnable runnable = () -> {
             final int method = randomIntBetween(0, 4);
             switch (method) {
-                case 0 -> adapter.actionGet();
-                case 1 -> adapter.actionGet("30s");
-                case 2 -> adapter.actionGet(30000);
-                case 3 -> adapter.actionGet(TimeValue.timeValueSeconds(30));
-                case 4 -> adapter.actionGet(30, TimeUnit.SECONDS);
+                case 0 -> future.actionGet();
+                case 1 -> future.actionGet("30s");
+                case 2 -> future.actionGet(30000);
+                case 3 -> future.actionGet(TimeValue.timeValueSeconds(30));
+                case 4 -> future.actionGet(30, TimeUnit.SECONDS);
                 default -> throw new AssertionError(method);
             }
         };
@@ -70,6 +73,13 @@ public class PlainActionFutureTests extends ESTestCase {
         thread.join();
     }
 
+    public void testNoResult() {
+        assumeTrue("assertions required for this test", Assertions.ENABLED);
+        final var future = new PlainActionFuture<>();
+        expectThrows(AssertionError.class, future::result);
+        expectThrows(AssertionError.class, future::actionResult);
+    }
+
     public void testUnwrapException() {
         checkUnwrap(new RemoteTransportException("test", new RuntimeException()), RuntimeException.class, RemoteTransportException.class);
         checkUnwrap(
@@ -82,17 +92,99 @@ public class PlainActionFutureTests extends ESTestCase {
     }
 
     private void checkUnwrap(Exception exception, Class<? extends Exception> actionGetException, Class<? extends Exception> getException) {
-        final PlainActionFuture<Void> adapter = new PlainActionFuture<>() {
-            @Override
-            public void onResponse(Void value) {
-                throw new AssertionError("should not be called");
+        final var future = new PlainActionFuture<>();
+        future.onFailure(exception);
+
+        assertEquals(actionGetException, expectThrows(RuntimeException.class, future::actionGet).getClass());
+        assertEquals(actionGetException, expectThrows(RuntimeException.class, () -> future.actionGet(10, TimeUnit.SECONDS)).getClass());
+        assertEquals(actionGetException, expectThrows(RuntimeException.class, future::actionResult).getClass());
+        assertEquals(actionGetException, expectThrows(RuntimeException.class, expectIgnoresInterrupt(future::actionResult)).getClass());
+        assertEquals(getException, expectThrows(ExecutionException.class, future::get).getCause().getClass());
+        assertEquals(getException, expectThrows(ExecutionException.class, () -> future.get(10, TimeUnit.SECONDS)).getCause().getClass());
+
+        if (exception instanceof RuntimeException) {
+            assertEquals(getException, expectThrows(Exception.class, future::result).getClass());
+            assertEquals(getException, expectThrows(Exception.class, expectIgnoresInterrupt(future::result)).getClass());
+            assertEquals(getException, expectThrows(Exception.class, () -> FutureUtils.get(future)).getClass());
+            assertEquals(getException, expectThrows(Exception.class, () -> FutureUtils.get(future, 10, TimeUnit.SECONDS)).getClass());
+        } else {
+            assertEquals(getException, expectThrowsWrapped(future::result).getClass());
+            assertEquals(getException, expectThrowsWrapped(expectIgnoresInterrupt(future::result)).getClass());
+            assertEquals(getException, expectThrowsWrapped(() -> FutureUtils.get(future)).getClass());
+            assertEquals(getException, expectThrowsWrapped(() -> FutureUtils.get(future, 10, TimeUnit.SECONDS)).getClass());
+        }
+
+        assertCapturesInterrupt(future::get);
+        assertCapturesInterrupt(() -> future.get(10, TimeUnit.SECONDS));
+        assertPropagatesInterrupt(future::actionGet);
+        assertPropagatesInterrupt(() -> future.actionGet(10, TimeUnit.SECONDS));
+    }
+
+    private static Throwable expectThrowsWrapped(ThrowingRunnable runnable) {
+        return expectThrows(UncategorizedExecutionException.class, ExecutionException.class, runnable).getCause();
+    }
+
+    public void testCancelException() {
+        final var future = new PlainActionFuture<>();
+        future.cancel(randomBoolean());
+
+        assertCancellation(future::get);
+        assertCancellation(future::actionGet);
+        assertCancellation(() -> future.get(10, TimeUnit.SECONDS));
+        assertCancellation(() -> future.actionGet(10, TimeUnit.SECONDS));
+        assertCancellation(future::result);
+        assertCancellation(future::actionResult);
+
+        try {
+            Thread.currentThread().interrupt();
+            assertCancellation(future::result);
+            assertCancellation(future::actionResult);
+        } finally {
+            assertTrue(Thread.interrupted());
+        }
+
+        assertCapturesInterrupt(future::get);
+        assertCapturesInterrupt(() -> future.get(10, TimeUnit.SECONDS));
+        assertPropagatesInterrupt(future::actionGet);
+        assertPropagatesInterrupt(() -> future.actionGet(10, TimeUnit.SECONDS));
+    }
+
+    private static void assertCancellation(ThrowingRunnable runnable) {
+        final var cancellationException = expectThrows(CancellationException.class, runnable);
+        assertEquals("Task was cancelled.", cancellationException.getMessage());
+        assertNull(cancellationException.getCause());
+    }
+
+    private static void assertCapturesInterrupt(ThrowingRunnable runnable) {
+        try {
+            Thread.currentThread().interrupt();
+            final var interruptedException = expectThrows(InterruptedException.class, runnable);
+            assertNull(interruptedException.getMessage());
+            assertNull(interruptedException.getCause());
+        } finally {
+            assertFalse(Thread.interrupted());
+        }
+    }
+
+    private static void assertPropagatesInterrupt(ThrowingRunnable runnable) {
+        try {
+            Thread.currentThread().interrupt();
+            final var interruptedException = expectThrows(IllegalStateException.class, InterruptedException.class, runnable);
+            assertNull(interruptedException.getMessage());
+            assertNull(interruptedException.getCause());
+        } finally {
+            assertTrue(Thread.interrupted());
+        }
+    }
+
+    private static ThrowingRunnable expectIgnoresInterrupt(ThrowingRunnable runnable) {
+        return () -> {
+            try {
+                Thread.currentThread().interrupt();
+                runnable.run();
+            } finally {
+                assertTrue(Thread.interrupted());
             }
         };
-
-        adapter.onFailure(exception);
-        assertEquals(actionGetException, expectThrows(RuntimeException.class, adapter::actionGet).getClass());
-        assertEquals(actionGetException, expectThrows(RuntimeException.class, () -> adapter.actionGet(10, TimeUnit.SECONDS)).getClass());
-        assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get()).getCause().getClass());
-        assertEquals(getException, expectThrows(ExecutionException.class, () -> adapter.get(10, TimeUnit.SECONDS)).getCause().getClass());
     }
 }

+ 15 - 15
server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java

@@ -65,14 +65,14 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         assertFalse(future1.isDone());
         testCache.assertPendingRefreshes(1);
         testCache.completeNextRefresh("foo", 1);
-        assertThat(future0.actionGet(0L), equalTo(1));
-        assertThat(future0.actionGet(0L), sameInstance(future1.actionGet(0L)));
+        assertThat(future0.result(), equalTo(1));
+        assertThat(future0.result(), sameInstance(future1.result()));
 
         // A further get() call with a matching key re-uses the cached value
         final TestFuture future2 = new TestFuture();
         testCache.get("foo", () -> false, future2);
         testCache.assertNoPendingRefreshes();
-        assertThat(future2.actionGet(0L), sameInstance(future1.actionGet(0L)));
+        assertThat(future2.result(), sameInstance(future1.result()));
 
         // A call with a different key triggers another refresh
         final TestFuture future3 = new TestFuture();
@@ -80,7 +80,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         assertFalse(future3.isDone());
         testCache.assertPendingRefreshes(1);
         testCache.completeNextRefresh("bar", 2);
-        assertThat(future3.actionGet(0L), equalTo(2));
+        assertThat(future3.result(), equalTo(2));
     }
 
     public void testListenerCompletedByRefreshEvenIfDiscarded() {
@@ -99,10 +99,10 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         testCache.get("foo", () -> false, future2);
         testCache.assertPendingRefreshes(1);
         testCache.completeNextRefresh("foo", 1);
-        assertThat(future2.actionGet(0L), equalTo(1));
+        assertThat(future2.result(), equalTo(1));
 
         // ... and the original listener is also completed successfully
-        assertThat(future1.actionGet(0L), sameInstance(future2.actionGet(0L)));
+        assertThat(future1.result(), sameInstance(future2.result()));
     }
 
     public void testListenerCompletedWithCancellationExceptionIfRefreshCancelled() {
@@ -122,9 +122,9 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         testCache.get("bar", () -> false, future2);
         testCache.assertPendingRefreshes(2);
         testCache.assertNextRefreshCancelled();
-        expectThrows(TaskCancelledException.class, () -> future1.actionGet(0L));
+        expectThrows(TaskCancelledException.class, future1::result);
         testCache.completeNextRefresh("bar", 2);
-        assertThat(future2.actionGet(0L), equalTo(2));
+        assertThat(future2.result(), equalTo(2));
     }
 
     public void testListenerCompletedWithFresherInputIfSuperseded() {
@@ -143,8 +143,8 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         assertFalse(future1.isDone());
 
         testCache.completeNextRefresh("bar", 2);
-        assertThat(future2.actionGet(0L), equalTo(2));
-        assertThat(future1.actionGet(0L), equalTo(2));
+        assertThat(future2.result(), equalTo(2));
+        assertThat(future1.result(), equalTo(2));
     }
 
     public void testRunsCancellationChecksEvenWhenSuperseded() {
@@ -166,7 +166,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
 
         isCancelled.set(true);
         testCache.completeNextRefresh("bar", 1);
-        expectThrows(TaskCancelledException.class, () -> future1.actionGet(0L));
+        expectThrows(TaskCancelledException.class, future1::result);
     }
 
     public void testExceptionCompletesListenersButIsNotCached() {
@@ -180,8 +180,8 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         testCache.assertPendingRefreshes(1);
         final ElasticsearchException exception = new ElasticsearchException("simulated");
         testCache.completeNextRefresh(exception);
-        assertSame(exception, expectThrows(ElasticsearchException.class, () -> future0.actionGet(0L)));
-        assertSame(exception, expectThrows(ElasticsearchException.class, () -> future1.actionGet(0L)));
+        assertSame(exception, expectThrows(ElasticsearchException.class, future0::result));
+        assertSame(exception, expectThrows(ElasticsearchException.class, future1::result));
 
         testCache.assertNoPendingRefreshes();
         // The exception is not cached, however, so a subsequent get() call with a matching key performs another refresh
@@ -189,7 +189,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         testCache.get("foo", () -> false, future2);
         testCache.assertPendingRefreshes(1);
         testCache.completeNextRefresh("foo", 1);
-        assertThat(future2.actionGet(0L), equalTo(1));
+        assertThat(future2.actionResult(), equalTo(1));
     }
 
     public void testConcurrentRefreshesAndCancellation() throws InterruptedException {
@@ -430,7 +430,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
         testCache.get("successful", () -> false, successfulFuture);
         cancelledThread.join();
 
-        expectThrows(TaskCancelledException.class, () -> cancelledFuture.actionGet(0L));
+        expectThrows(TaskCancelledException.class, cancelledFuture::result);
     }
 
     private static final ThreadContext testThreadContext = new ThreadContext(Settings.EMPTY);

+ 2 - 2
server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java

@@ -192,10 +192,10 @@ public class ListenableFutureTests extends ESTestCase {
             barrier.await(10, TimeUnit.SECONDS); // release blocked executor
 
             if (success) {
-                expectThrows(EsRejectedExecutionException.class, () -> future2.actionGet(0, TimeUnit.SECONDS));
+                expectThrows(EsRejectedExecutionException.class, future2::result);
                 assertNull(future1.actionGet(10, TimeUnit.SECONDS));
             } else {
-                var exception = expectThrows(EsRejectedExecutionException.class, () -> future2.actionGet(0, TimeUnit.SECONDS));
+                var exception = expectThrows(EsRejectedExecutionException.class, future2::result);
                 assertEquals(1, exception.getSuppressed().length);
                 assertThat(exception.getSuppressed()[0], instanceOf(ElasticsearchException.class));
                 assertEquals(

+ 1 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -270,7 +270,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
                 (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo")
             );
             deterministicTaskQueue.runAllRunnableTasks();
-            assertNull(future.actionGet(0));
+            assertNull(future.result());
         } finally {
             testClusterNodes.nodes.values().forEach(TestClusterNodes.TestClusterNode::stop);
         }

+ 1 - 1
server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java

@@ -646,7 +646,7 @@ public class ClusterConnectionManagerTests extends ESTestCase {
 
         assertTrue("threads did not all complete", countDownLatch.await(10, TimeUnit.SECONDS));
         assertFalse(closingRefs.hasReferences());
-        assertTrue(cleanlyOpenedConnectionFuture.actionGet(0, TimeUnit.SECONDS));
+        assertTrue(cleanlyOpenedConnectionFuture.result());
 
         assertTrue("validatorPermits not all released", validatorPermits.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
         connectionManager.close();

+ 2 - 2
x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/ProgressListenableActionFuture.java

@@ -142,7 +142,7 @@ class ProgressListenableActionFuture extends PlainActionFuture<Long> {
             listeners = null;
         }
         if (listenersToExecute != null) {
-            listenersToExecute.stream().map(Tuple::v2).forEach(listener -> executeListener(listener, () -> actionGet(0L)));
+            listenersToExecute.stream().map(Tuple::v2).forEach(listener -> executeListener(listener, this::actionResult));
         }
         assert invariant();
     }
@@ -171,7 +171,7 @@ class ProgressListenableActionFuture extends PlainActionFuture<Long> {
             }
         }
         if (executeImmediate) {
-            executeListener(listener, completed ? () -> actionGet(0L) : () -> progressValue);
+            executeListener(listener, completed ? this::actionResult : () -> progressValue);
         }
         assert invariant();
     }

+ 1 - 3
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java

@@ -53,7 +53,6 @@ import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -127,7 +126,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
@@ -1038,7 +1036,7 @@ public class ApiKeyService {
 
     // pkg private for testing
     CachedApiKeyHashResult getFromCache(String id) {
-        return apiKeyAuthCache == null ? null : FutureUtils.get(apiKeyAuthCache.get(id), 0L, TimeUnit.MILLISECONDS);
+        return apiKeyAuthCache == null ? null : apiKeyAuthCache.get(id).result();
     }
 
     // pkg private for testing

+ 7 - 17
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/support/SecondaryAuthenticatorTests.java

@@ -65,7 +65,6 @@ import java.util.Base64;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -169,7 +168,7 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
         final PlainActionFuture<SecondaryAuthentication> future = new PlainActionFuture<>();
         authenticator.authenticate(AuthenticateAction.NAME, request, future);
 
-        assertThat(future.get(0, TimeUnit.MILLISECONDS), nullValue());
+        assertThat(future.result(), nullValue());
     }
 
     public void testAuthenticateRestRequestIsANoOpIfHeaderIsMissing() throws Exception {
@@ -177,7 +176,7 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
         final PlainActionFuture<SecondaryAuthentication> future = new PlainActionFuture<>();
         authenticator.authenticateAndAttachToContext(request, future);
 
-        assertThat(future.get(0, TimeUnit.MILLISECONDS), nullValue());
+        assertThat(future.result(), nullValue());
         assertThat(SecondaryAuthentication.readFromContext(securityContext), nullValue());
     }
 
@@ -187,10 +186,7 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
         final PlainActionFuture<SecondaryAuthentication> future = new PlainActionFuture<>();
         authenticator.authenticate(AuthenticateAction.NAME, request, future);
 
-        final ElasticsearchSecurityException ex = expectThrows(
-            ElasticsearchSecurityException.class,
-            () -> future.actionGet(0, TimeUnit.MILLISECONDS)
-        );
+        final ElasticsearchSecurityException ex = expectThrows(ElasticsearchSecurityException.class, future::actionResult);
         assertThat(ex, TestMatchers.throwableWithMessage(Matchers.containsString("secondary user")));
         assertThat(ex.getCause(), TestMatchers.throwableWithMessage(Matchers.containsString("credentials")));
     }
@@ -201,10 +197,7 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
         final PlainActionFuture<SecondaryAuthentication> future = new PlainActionFuture<>();
         authenticator.authenticateAndAttachToContext(request, future);
 
-        final ElasticsearchSecurityException ex = expectThrows(
-            ElasticsearchSecurityException.class,
-            () -> future.actionGet(0, TimeUnit.MILLISECONDS)
-        );
+        final ElasticsearchSecurityException ex = expectThrows(ElasticsearchSecurityException.class, future::actionResult);
         assertThat(ex, TestMatchers.throwableWithMessage(Matchers.containsString("secondary user")));
         assertThat(ex.getCause(), TestMatchers.throwableWithMessage(Matchers.containsString("credentials")));
 
@@ -245,7 +238,7 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
             future.onResponse(result);
         }, e -> future.onFailure(e)));
 
-        final SecondaryAuthentication secondaryAuthentication = future.get(0, TimeUnit.MILLISECONDS);
+        final SecondaryAuthentication secondaryAuthentication = future.result();
         assertThat(secondaryAuthentication, Matchers.notNullValue());
         assertThat(secondaryAuthentication.getAuthentication(), Matchers.notNullValue());
         assertThat(secondaryAuthentication.getAuthentication().getEffectiveSubject().getUser().principal(), equalTo(user));
@@ -288,10 +281,7 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
             future.onFailure(e);
         }));
 
-        final ElasticsearchSecurityException ex = expectThrows(
-            ElasticsearchSecurityException.class,
-            () -> future.actionGet(0, TimeUnit.MILLISECONDS)
-        );
+        final ElasticsearchSecurityException ex = expectThrows(ElasticsearchSecurityException.class, future::actionResult);
 
         assertThat(ex, TestMatchers.throwableWithMessage(Matchers.containsString("secondary user")));
         assertThat(ex.getCause(), TestMatchers.throwableWithMessage(Matchers.containsString(user)));
@@ -325,7 +315,7 @@ public class SecondaryAuthenticatorTests extends ESTestCase {
         final PlainActionFuture<SecondaryAuthentication> future = new PlainActionFuture<>();
         authenticator.authenticate(AuthenticateAction.NAME, request, future);
 
-        final SecondaryAuthentication secondaryAuthentication = future.actionGet(0, TimeUnit.MILLISECONDS);
+        final SecondaryAuthentication secondaryAuthentication = future.actionResult();
         assertThat(secondaryAuthentication, Matchers.notNullValue());
         assertThat(secondaryAuthentication.getAuthentication(), Matchers.notNullValue());
         assertThat(secondaryAuthentication.getAuthentication().getEffectiveSubject().getUser(), equalTo(user));