Ver código fonte

Use ThreadedActionListener in ListenableFuture (#94367)

Following #94363 we can use `ThreadedActionListener` to represent the
forking that happens within `ListenableFuture`, rather than tracking the
executor and listener as separate components of a Tuple.
David Turner 2 anos atrás
pai
commit
3b9a21f6bd

+ 45 - 73
server/src/main/java/org/elasticsearch/common/util/concurrent/ListenableFuture.java

@@ -13,11 +13,11 @@ import org.elasticsearch.action.support.ContextPreservingActionListener;
 import org.elasticsearch.action.support.ListenableActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.ThreadedActionListener;
-import org.elasticsearch.core.Tuple;
+import org.elasticsearch.core.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,100 +43,72 @@ import java.util.concurrent.TimeUnit;
 public final class ListenableFuture<V> extends PlainActionFuture<V> {
 
     private volatile boolean done = false;
-    private List<Tuple<ActionListener<V>, ExecutorService>> listeners;
+    private List<ActionListener<V>> listeners;
 
     /**
-     * Adds a listener to this future. If the future has not yet completed, the listener will be
-     * notified of a response or exception on the thread completing this future.
-     * If the future has completed, the listener will be notified immediately without forking to
-     * a different thread.
+     * Adds a listener to this future. If the future has not yet completed, the listener will be notified of a response or exception on the
+     * thread completing this future. If the future has completed, the listener will be notified immediately without forking to a different
+     * thread.
      */
     public void addListener(ActionListener<V> listener) {
         addListener(listener, EsExecutors.DIRECT_EXECUTOR_SERVICE, null);
     }
 
     /**
-     * Adds a listener to this future. If the future has not yet completed, the listener will be
-     * notified of a response or exception in a runnable submitted to the ExecutorService provided.
-     * If the future has completed, the listener will be notified immediately without forking to
-     * a different thread.
+     * Adds a listener to this future. If the future has not yet completed, the listener will be notified of a response or exception in a
+     * runnable submitted to the {@link Executor} provided. If the future has completed, the listener will be notified immediately without
+     * forking to a different thread.
      *
-     * It will apply the provided ThreadContext (if not null) when executing the listening.
+     * It will restore the provided {@link ThreadContext} (if not null) when completing the listener.
      */
-    public void addListener(ActionListener<V> listener, ExecutorService executor, ThreadContext threadContext) {
-        if (done) {
+    public void addListener(ActionListener<V> listener, Executor executor, @Nullable ThreadContext threadContext) {
+        if (done || addListenerIfIncomplete(listener, executor, threadContext) == false) {
             // run the callback directly, we don't hold the lock and don't need to fork!
-            notifyListenerDirectly(listener);
-        } else {
-            final boolean run;
-            // check done under lock since it could have been modified and protect modifications
-            // to the list under lock
-            synchronized (this) {
-                if (done) {
-                    run = true;
-                } else {
-                    final ActionListener<V> wrappedListener;
-                    if (threadContext == null) {
-                        wrappedListener = listener;
-                    } else {
-                        wrappedListener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
-                    }
-                    if (listeners == null) {
-                        listeners = new ArrayList<>();
-                    }
-                    listeners.add(new Tuple<>(wrappedListener, executor));
-                    run = false;
-                }
-            }
-
-            if (run) {
-                // run the callback directly, we don't hold the lock and don't need to fork!
-                notifyListenerDirectly(listener);
-            }
+            notifyListener(listener);
         }
     }
 
     @Override
     protected void done(boolean ignored) {
-        final List<Tuple<ActionListener<V>, ExecutorService>> existingListeners;
-        synchronized (this) {
-            done = true;
-            existingListeners = listeners;
-            if (existingListeners == null) {
-                return;
-            }
-            listeners = null;
-        }
-        for (Tuple<ActionListener<V>, ExecutorService> t : existingListeners) {
-            final ExecutorService executorService = t.v2();
-            final ActionListener<V> listener = t.v1();
-            if (executorService == EsExecutors.DIRECT_EXECUTOR_SERVICE) {
-                notifyListenerDirectly(listener);
-            } else {
-                notifyListener(listener, executorService);
+        final var existingListeners = acquireExistingListeners();
+        if (existingListeners != null) {
+            for (final var listener : existingListeners) {
+                notifyListener(listener);
             }
         }
     }
 
-    private void notifyListenerDirectly(ActionListener<V> listener) {
-        // call get in a non-blocking fashion as we could be on a network thread
-        // or another thread like the scheduler, which we should never block!
-        assert done;
-        ActionListener.completeWith(listener, () -> FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS));
+    private synchronized boolean addListenerIfIncomplete(ActionListener<V> listener, Executor executor, ThreadContext threadContext) {
+        // check done under lock since it could have been modified; also protect modifications to the list under lock
+        if (done) {
+            return false;
+        }
+        if (threadContext != null) {
+            listener = ContextPreservingActionListener.wrapPreservingContext(listener, threadContext);
+        }
+        if (executor != EsExecutors.DIRECT_EXECUTOR_SERVICE) {
+            listener = new ThreadedActionListener<>(executor, listener);
+        }
+        if (listeners == null) {
+            listeners = new ArrayList<>();
+        }
+        listeners.add(listener);
+        return true;
     }
 
-    private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
-        ActionListener.run(listener, l -> executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-                notifyListenerDirectly(l);
-            }
+    private synchronized List<ActionListener<V>> acquireExistingListeners() {
+        try {
+            done = true;
+            return listeners;
+        } finally {
+            listeners = null;
+        }
+    }
 
-            @Override
-            public String toString() {
-                return "ListenableFuture notification";
-            }
-        }));
+    private void notifyListener(ActionListener<V> listener) {
+        assert done;
+        // call get() in a non-blocking fashion as we could be on a network or scheduler thread which we must not block
+        ActionListener.completeWith(listener, () -> FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS));
     }
 
     @Override

+ 64 - 1
server/src/test/java/org/elasticsearch/common/util/concurrent/ListenableFutureTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.common.util.concurrent;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ReachabilityChecker;
@@ -19,14 +20,16 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 
 public class ListenableFutureTests extends ESTestCase {
 
     private ExecutorService executorService;
-    private ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+    private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
 
     @After
     public void stopExecutorService() throws InterruptedException {
@@ -145,4 +148,64 @@ public class ListenableFutureTests extends ESTestCase {
         future.addListener(reachabilityChecker.register(ActionListener.running(() -> {})));
         reachabilityChecker.ensureUnreachable();
     }
+
+    public void testRejection() throws Exception {
+        final CyclicBarrier barrier = new CyclicBarrier(2);
+        final EsThreadPoolExecutor executorService = EsExecutors.newFixed(
+            "testRejection",
+            1,
+            1,
+            EsExecutors.daemonThreadFactory("testRejection"),
+            threadContext,
+            false
+        );
+
+        try {
+            executorService.execute(() -> {
+                try {
+                    barrier.await(10, TimeUnit.SECONDS); // notify main thread that the executor is blocked
+                    barrier.await(10, TimeUnit.SECONDS); // wait for main thread to release us
+                } catch (Exception e) {
+                    throw new AssertionError("unexpected", e);
+                }
+            });
+
+            barrier.await(10, TimeUnit.SECONDS); // wait for executor to be blocked
+
+            final var listenableFuture = new ListenableFuture<Void>();
+            final var future1 = new PlainActionFuture<Void>();
+            final var future2 = new PlainActionFuture<Void>();
+
+            listenableFuture.addListener(future1, executorService, null);
+            listenableFuture.addListener(future2, executorService, null);
+
+            final var success = randomBoolean();
+            if (success) {
+                listenableFuture.onResponse(null);
+            } else {
+                listenableFuture.onFailure(new ElasticsearchException("simulated"));
+            }
+
+            assertFalse(future1.isDone()); // still waiting in the executor queue
+            assertTrue(future2.isDone()); // rejected from the executor on this thread
+
+            barrier.await(10, TimeUnit.SECONDS); // release blocked executor
+
+            if (success) {
+                expectThrows(EsRejectedExecutionException.class, () -> future2.actionGet(0, TimeUnit.SECONDS));
+                assertNull(future1.actionGet(10, TimeUnit.SECONDS));
+            } else {
+                var exception = expectThrows(EsRejectedExecutionException.class, () -> future2.actionGet(0, TimeUnit.SECONDS));
+                assertEquals(1, exception.getSuppressed().length);
+                assertThat(exception.getSuppressed()[0], instanceOf(ElasticsearchException.class));
+                assertEquals(
+                    "simulated",
+                    expectThrows(ElasticsearchException.class, () -> future1.actionGet(10, TimeUnit.SECONDS)).getMessage()
+                );
+            }
+        } finally {
+            barrier.reset();
+            terminate(executorService);
+        }
+    }
 }