Browse Source

Improve some `execute(Runnable)` invocations (#111832)

Fixes a few spots where we're submitting to an executor a bare
`Runnable` that completes a listener, replacing them all with an
appropriate `ActionRunnable` util.
David Turner 1 year ago
parent
commit
aa24d02a29

+ 2 - 1
server/src/main/java/org/elasticsearch/indices/IndicesService.java

@@ -18,6 +18,7 @@ import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.ResolvedIndices;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.TransportAutoPutMappingAction;
@@ -399,7 +400,7 @@ public class IndicesService extends AbstractLifecycleComponent
         final CountDownLatch latch = new CountDownLatch(indices.size());
         for (final Index index : indices) {
             indicesStopExecutor.execute(
-                () -> ActionListener.run(
+                ActionRunnable.wrap(
                     ActionListener.assertOnce(ActionListener.<Void>releasing(latch::countDown)),
                     l -> removeIndex(
                         index,

+ 1 - 3
server/src/test/java/org/elasticsearch/action/ActionRunnableTests.java

@@ -47,11 +47,9 @@ public class ActionRunnableTests extends ESTestCase {
                 assertEquals("simulated", e.getMessage());
                 assertTrue(releaseListener.isDone());
                 l.onResponse(null);
-            }), () -> safeReleaseListener.onResponse(null), l -> executor.execute(() -> ActionListener.completeWith(l, () -> {
+            }), () -> safeReleaseListener.onResponse(null), l -> executor.execute(ActionRunnable.run(l, () -> {
                 if (randomBoolean()) {
                     throw new ElasticsearchException("simulated");
-                } else {
-                    return null;
                 }
             }))));
 

+ 3 - 2
server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

@@ -13,6 +13,7 @@ import org.elasticsearch.Build;
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -607,12 +608,12 @@ public class NodeConnectionsServiceTests extends ESTestCase {
         public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
             final CheckedRunnable<Exception> connectionBlock = nodeConnectionBlocks.get(node);
             if (profile == null && randomConnectionExceptions && randomBoolean()) {
-                threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> {
+                threadPool.generic().execute(ActionRunnable.run(listener, () -> {
                     runConnectionBlock(connectionBlock);
                     throw new ConnectTransportException(node, "simulated");
                 }));
             } else {
-                threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> {
+                threadPool.generic().execute(ActionRunnable.supply(listener, () -> {
                     runConnectionBlock(connectionBlock);
                     return new Connection() {
                         private final SubscribableListener<Void> closeListener = new SubscribableListener<>();

+ 2 - 1
server/src/test/java/org/elasticsearch/common/util/CancellableSingleObjectCacheTests.java

@@ -10,6 +10,7 @@ package org.elasticsearch.common.util;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -202,7 +203,7 @@ public class CancellableSingleObjectCacheTests extends ESTestCase {
                     BooleanSupplier supersedeIfStale,
                     ActionListener<Integer> listener
                 ) {
-                    threadPool.generic().execute(() -> ActionListener.completeWith(listener, () -> {
+                    threadPool.generic().execute(ActionRunnable.supply(listener, () -> {
                         ensureNotCancelled.run();
                         if (s.equals("FAIL")) {
                             throw new ElasticsearchException("simulated");