Explorar o código

Use rejection exception in ThreadedActionListener (#94363)

Today if the exceptional completion of a `ThreadedActionListener` is
rejected from its executor then the listener is completed with the
original exception on the completing thread, with the exception
representing the rejection added to its suppressed exceptions list. In
practice this is a little trappy, we may want to handle the rejection
differently but won't always remember to check the suppressed exceptions
list for a rejection.

This commit reverses the order of exceptions passed to the delegate
listener in this case: the rejection exception is at the top level, with
the original exception added to its suppressed exceptions list.
David Turner %!s(int64=2) %!d(string=hai) anos
pai
achega
ba671e4871

+ 6 - 6
server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java

@@ -71,13 +71,13 @@ public final class ThreadedActionListener<Response> implements ActionListener<Re
             }
 
             @Override
-            public void onRejection(Exception e2) {
-                e.addSuppressed(e2);
+            public void onRejection(Exception rejectionException) {
+                rejectionException.addSuppressed(e);
                 try {
-                    delegate.onFailure(e);
-                } catch (Exception e3) {
-                    e.addSuppressed(e3);
-                    onFailure(e);
+                    delegate.onFailure(rejectionException);
+                } catch (Exception doubleFailure) {
+                    rejectionException.addSuppressed(doubleFailure);
+                    onFailure(rejectionException);
                 }
             }
 

+ 31 - 1
server/src/test/java/org/elasticsearch/action/support/ThreadedActionListenerTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.FixedExecutorBuilder;
@@ -50,7 +51,36 @@ public class ThreadedActionListenerTests extends ESTestCase {
                     final var listener = new ThreadedActionListener<Void>(
                         threadPool.executor(pool),
                         (pool.equals("fixed-bounded-queue") || pool.startsWith("scaling")) && rarely(),
-                        ActionListener.running(countdownLatch::countDown)
+                        ActionListener.runAfter(new ActionListener<>() {
+                            @Override
+                            public void onResponse(Void ignored) {}
+
+                            @Override
+                            public void onFailure(Exception e) {
+                                assertNull(e.getCause());
+                                if (e instanceof EsRejectedExecutionException esRejectedExecutionException) {
+                                    assertTrue(esRejectedExecutionException.isExecutorShutdown());
+                                    if (e.getSuppressed().length == 0) {
+                                        return;
+                                    }
+                                    assertEquals(1, e.getSuppressed().length);
+                                    if (e.getSuppressed()[0]instanceof ElasticsearchException elasticsearchException) {
+                                        e = elasticsearchException;
+                                        assertNull(e.getCause());
+                                    } else {
+                                        throw new AssertionError("unexpected", e);
+                                    }
+                                }
+
+                                if (e instanceof ElasticsearchException) {
+                                    assertEquals("simulated", e.getMessage());
+                                    assertEquals(0, e.getSuppressed().length);
+                                } else {
+                                    throw new AssertionError("unexpected", e);
+                                }
+
+                            }
+                        }, countdownLatch::countDown)
                     );
                     synchronized (closeFlag) {
                         if (closeFlag.get() && shutdownUnsafePools.contains(pool)) {