Browse Source

Remove `CompletableFuture` from `Node#prepareForClose` (#111846)

More of a style thing than anything else given that there's no risk of
catching an `Error` here, but still generally there are alternatives to
`CompletableFuture` which we prefer.
David Turner 1 year ago
parent
commit
a63a2f7b00
1 changed files with 51 additions and 35 deletions
  1. 51 35
      server/src/main/java/org/elasticsearch/node/Node.java

+ 51 - 35
server/src/main/java/org/elasticsearch/node/Node.java

@@ -13,6 +13,9 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.util.SetOnce;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.search.TransportSearchAction;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.RefCountingListener;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.bootstrap.BootstrapCheck;
 import org.elasticsearch.bootstrap.BootstrapCheck;
 import org.elasticsearch.bootstrap.BootstrapContext;
 import org.elasticsearch.bootstrap.BootstrapContext;
 import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.client.internal.Client;
@@ -40,7 +43,6 @@ import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.BoundTransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.PathUtils;
 import org.elasticsearch.core.PathUtils;
@@ -101,11 +103,14 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 
 import javax.net.ssl.SNIHostName;
 import javax.net.ssl.SNIHostName;
 
 
@@ -591,52 +596,63 @@ public class Node implements Closeable {
      * Invokes hooks to prepare this node to be closed. This should be called when Elasticsearch receives a request to shut down
      * Invokes hooks to prepare this node to be closed. This should be called when Elasticsearch receives a request to shut down
      * gracefully from the underlying operating system, before system resources are closed. This method will block
      * gracefully from the underlying operating system, before system resources are closed. This method will block
      * until the node is ready to shut down.
      * until the node is ready to shut down.
-     *
+     * <p>
      * Note that this class is part of infrastructure to react to signals from the operating system - most graceful shutdown
      * Note that this class is part of infrastructure to react to signals from the operating system - most graceful shutdown
      * logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
      * logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
      */
      */
     public void prepareForClose() {
     public void prepareForClose() {
-        HttpServerTransport httpServerTransport = injector.getInstance(HttpServerTransport.class);
-        Map<String, Runnable> stoppers = new HashMap<>();
-        TimeValue maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(this.settings());
-        stoppers.put("http-server-transport-stop", httpServerTransport::close);
-        stoppers.put("async-search-stop", () -> this.awaitSearchTasksComplete(maxTimeout));
-        if (terminationHandler != null) {
-            stoppers.put("termination-handler-stop", terminationHandler::handleTermination);
+        final var maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(this.settings());
+
+        record Stopper(String name, SubscribableListener<Void> listener) {
+            boolean isIncomplete() {
+                return listener().isDone() == false;
+            }
         }
         }
 
 
-        Map<String, CompletableFuture<Void>> futures = new HashMap<>(stoppers.size());
-        for (var stopperEntry : stoppers.entrySet()) {
-            var future = new CompletableFuture<Void>();
-            new Thread(() -> {
-                try {
-                    stopperEntry.getValue().run();
-                } catch (Exception ex) {
-                    logger.warn("unexpected exception in shutdown task [" + stopperEntry.getKey() + "]", ex);
-                } finally {
-                    future.complete(null);
-                }
-            }, stopperEntry.getKey()).start();
-            futures.put(stopperEntry.getKey(), future);
+        final var stoppers = new ArrayList<Stopper>();
+        final var allStoppersFuture = new PlainActionFuture<Void>();
+        try (var listeners = new RefCountingListener(allStoppersFuture)) {
+            final BiConsumer<String, Runnable> stopperRunner = (name, action) -> {
+                final var stopper = new Stopper(name, new SubscribableListener<>());
+                stoppers.add(stopper);
+                stopper.listener().addListener(listeners.acquire());
+                new Thread(() -> {
+                    try {
+                        action.run();
+                    } catch (Exception ex) {
+                        logger.warn("unexpected exception in shutdown task [" + stopper.name() + "]", ex);
+                    } finally {
+                        stopper.listener().onResponse(null);
+                    }
+                }, stopper.name()).start();
+            };
+
+            stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close);
+            stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout));
+            if (terminationHandler != null) {
+                stopperRunner.accept("termination-handler-stop", terminationHandler::handleTermination);
+            }
         }
         }
 
 
-        @SuppressWarnings(value = "rawtypes") // Can't make an array of parameterized types, but it complains if you leave the type out
-        CompletableFuture<Void> allStoppers = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[stoppers.size()]));
+        final Supplier<String> incompleteStoppersDescriber = () -> stoppers.stream()
+            .filter(Stopper::isIncomplete)
+            .map(Stopper::name)
+            .collect(Collectors.joining(", ", "[", "]"));
 
 
         try {
         try {
             if (TimeValue.ZERO.equals(maxTimeout)) {
             if (TimeValue.ZERO.equals(maxTimeout)) {
-                FutureUtils.get(allStoppers);
+                allStoppersFuture.get();
             } else {
             } else {
-                FutureUtils.get(allStoppers, maxTimeout.millis(), TimeUnit.MILLISECONDS);
+                allStoppersFuture.get(maxTimeout.millis(), TimeUnit.MILLISECONDS);
             }
             }
-
-        } catch (ElasticsearchTimeoutException t) {
-            var unfinishedTasks = futures.entrySet()
-                .stream()
-                .filter(entry -> entry.getValue().isDone() == false)
-                .map(Map.Entry::getKey)
-                .toList();
-            logger.warn("timed out while waiting for graceful shutdown tasks: " + unfinishedTasks);
+        } catch (ExecutionException e) {
+            assert false : e; // listeners are never completed exceptionally
+            logger.warn("failed during graceful shutdown tasks", e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("interrupted while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get(), e);
+        } catch (TimeoutException e) {
+            logger.warn("timed out while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get());
         }
         }
     }
     }