Browse Source

Remove ActionListener#wrap(ActionListener) (#103305)

This utility is trappy (it double-completes the listener on failure) and
is hardly used. This commit removes it.
David Turner 1 year ago
parent
commit
42dbc3d795

+ 0 - 23
server/src/main/java/org/elasticsearch/action/ActionListener.java

@@ -184,29 +184,6 @@ public interface ActionListener<Response> {
         };
         };
     }
     }
 
 
-    /**
-     * Adds a wrapper around a listener which catches exceptions thrown by its {@link #onResponse} method and feeds them to its
-     * {@link #onFailure} method.
-     */
-    static <DelegateResponse, Response extends DelegateResponse> ActionListener<Response> wrap(ActionListener<DelegateResponse> delegate) {
-        return new ActionListener<>() {
-            @Override
-            public void onResponse(Response response) {
-                ActionListener.run(delegate, l -> l.onResponse(response));
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                safeOnFailure(delegate, e);
-            }
-
-            @Override
-            public String toString() {
-                return "wrapped{" + delegate + "}";
-            }
-        };
-    }
-
     /**
     /**
      * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception
      * Notifies every given listener with the response passed to {@link #onResponse(Object)}. If a listener itself throws an exception
      * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining
      * the exception is forwarded to {@link #onFailure(Exception)}. If in turn {@link #onFailure(Exception)} fails all remaining

+ 2 - 4
server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java

@@ -94,8 +94,7 @@ public class SubscribableListener<T> implements ActionListener<T> {
      * listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and
      * listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and
      * completed when this listener is completed.
      * completed when this listener is completed.
      * <p>
      * <p>
-     * Subscribed listeners must not throw any exceptions. Use {@link ActionListener#wrap(ActionListener)} if you have a listener for which
-     * exceptions from its {@link ActionListener#onResponse} method should be handled by its own {@link ActionListener#onFailure} method.
+     * Subscribed listeners must not throw any exceptions.
      * <p>
      * <p>
      * Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions
      * Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions
      * were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with
      * were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with
@@ -113,8 +112,7 @@ public class SubscribableListener<T> implements ActionListener<T> {
      * listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and
      * listener immediately with the result with which this listener was completed. Otherwise, the subscribing listener is retained and
      * completed when this listener is completed.
      * completed when this listener is completed.
      * <p>
      * <p>
-     * Subscribed listeners must not throw any exceptions. Use {@link ActionListener#wrap(ActionListener)} if you have a listener for which
-     * exceptions from its {@link ActionListener#onResponse} method should be handled by its own {@link ActionListener#onFailure} method.
+     * Subscribed listeners must not throw any exceptions.
      * <p>
      * <p>
      * Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions
      * Listeners added strictly before this listener is completed will themselves be completed in the order in which their subscriptions
      * were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with
      * were received. However, there are no guarantees about the ordering of the completions of listeners which are added concurrently with

+ 7 - 15
server/src/main/java/org/elasticsearch/action/support/replication/PostWriteRefresh.java

@@ -118,21 +118,13 @@ public class PostWriteRefresh {
             return;
             return;
         }
         }
 
 
-        engineOrNull.addFlushListener(location, ActionListener.wrap(new ActionListener<>() {
-            @Override
-            public void onResponse(Long generation) {
-                try (
-                    ThreadContext.StoredContext ignore = transportService.getThreadPool()
-                        .getThreadContext()
-                        .stashWithOrigin(POST_WRITE_REFRESH_ORIGIN)
-                ) {
-                    sendUnpromotableRequests(indexShard, generation, forced, listener, postWriteRefreshTimeout);
-                }
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                listener.onFailure(e);
+        engineOrNull.addFlushListener(location, listener.delegateFailureAndWrap((l, generation) -> {
+            try (
+                ThreadContext.StoredContext ignore = transportService.getThreadPool()
+                    .getThreadContext()
+                    .stashWithOrigin(POST_WRITE_REFRESH_ORIGIN)
+            ) {
+                sendUnpromotableRequests(indexShard, generation, forced, l, postWriteRefreshTimeout);
             }
             }
         }));
         }));
     }
     }

+ 1 - 1
server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java

@@ -307,7 +307,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
     // visible for testing
     // visible for testing
     void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
     void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
         logger.trace("{} fetching [{}] from {}", shardId, type, nodes);
         logger.trace("{} fetching [{}] from {}", shardId, type, nodes);
-        list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<T>>() {
+        list(shardId, customDataPath, nodes, new ActionListener<>() {
             @Override
             @Override
             public void onResponse(BaseNodesResponse<T> response) {
             public void onResponse(BaseNodesResponse<T> response) {
                 assert assertSameNodes(response);
                 assert assertSameNodes(response);

+ 2 - 2
server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java

@@ -261,7 +261,7 @@ public class GatewayAllocator implements ExistingShardsAllocator {
                         client.executeLocally(
                         client.executeLocally(
                             TransportNodesListGatewayStartedShards.TYPE,
                             TransportNodesListGatewayStartedShards.TYPE,
                             new TransportNodesListGatewayStartedShards.Request(shardId, customDataPath, nodes),
                             new TransportNodesListGatewayStartedShards.Request(shardId, customDataPath, nodes),
-                            ActionListener.wrap(listener)
+                            listener.safeMap(r -> r) // weaken type
                         );
                         );
                     }
                     }
                 }
                 }
@@ -308,7 +308,7 @@ public class GatewayAllocator implements ExistingShardsAllocator {
                         client.executeLocally(
                         client.executeLocally(
                             TransportNodesListShardStoreMetadata.TYPE,
                             TransportNodesListShardStoreMetadata.TYPE,
                             new TransportNodesListShardStoreMetadata.Request(shardId, customDataPath, nodes),
                             new TransportNodesListShardStoreMetadata.Request(shardId, customDataPath, nodes),
-                            ActionListener.wrap(listener)
+                            listener.safeMap(r -> r) // weaken type
                         );
                         );
                     }
                     }
                 }
                 }

+ 0 - 43
server/src/test/java/org/elasticsearch/action/ActionListenerTests.java

@@ -107,49 +107,6 @@ public class ActionListenerTests extends ESTestCase {
         );
         );
     }
     }
 
 
-    public void testWrapListener() {
-        var succeeded = new AtomicBoolean();
-        var failed = new AtomicBoolean();
-
-        var listener = ActionListener.wrap(new ActionListener<>() {
-            @Override
-            public void onResponse(Object o) {
-                assertTrue(succeeded.compareAndSet(false, true));
-                if (o instanceof RuntimeException e) {
-                    throw e;
-                }
-            }
-
-            @Override
-            public void onFailure(Exception e) {
-                assertTrue(failed.compareAndSet(false, true));
-                assertEquals("test exception", e.getMessage());
-                if (e instanceof UnsupportedOperationException uoe) {
-                    throw uoe;
-                }
-            }
-
-            @Override
-            public String toString() {
-                return "test listener";
-            }
-        });
-
-        assertEquals("wrapped{test listener}", listener.toString());
-
-        listener.onResponse(new Object());
-        assertTrue(succeeded.getAndSet(false));
-        assertFalse(failed.getAndSet(false));
-
-        listener.onFailure(new RuntimeException("test exception"));
-        assertFalse(succeeded.getAndSet(false));
-        assertTrue(failed.getAndSet(false));
-
-        listener.onResponse(new RuntimeException("test exception"));
-        assertTrue(succeeded.getAndSet(false));
-        assertTrue(failed.getAndSet(false));
-    }
-
     public void testOnResponse() {
     public void testOnResponse() {
         final int numListeners = randomIntBetween(1, 20);
         final int numListeners = randomIntBetween(1, 20);
         List<AtomicReference<Boolean>> refList = new ArrayList<>();
         List<AtomicReference<Boolean>> refList = new ArrayList<>();