Просмотр исходного кода

Fix ReopenWhileClosingIT Deadlocks (#92662)

Instead of blocking either a transport or management thread here (the latter will lead to
a dead-lock if there's only a single management thread available, we should just queue
up those sends and run them async via listener).
This test is currently only working on `main` because of #90193, this makes it work
with a single management thread as well so it works on 7.x again.

closes #92629
Armin Braun 2 лет назад
Родитель
Сommit
24e2f32799

+ 13 - 9
server/src/internalClusterTest/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.indices.state;
 
 import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
 import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
 import org.elasticsearch.cluster.ClusterState;
@@ -16,6 +17,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.Glob;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
@@ -24,6 +26,7 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -128,8 +131,7 @@ public class ReopenWhileClosingIT extends ESIntegTestCase {
             TransportService.class,
             internalCluster().getMasterName()
         );
-
-        final CountDownLatch release = new CountDownLatch(1);
+        final ListenableFuture<Void> release = new ListenableFuture<>();
         for (DiscoveryNode node : internalCluster().clusterService().state().getNodes()) {
             mockTransportService.addSendBehavior(
                 internalCluster().getInstance(TransportService.class, node.getName()),
@@ -140,21 +142,23 @@ public class ReopenWhileClosingIT extends ESIntegTestCase {
                             if (Glob.globMatch(indexPattern, index)) {
                                 logger.info("request {} intercepted for index {}", requestId, index);
                                 onIntercept.run();
-                                try {
-                                    release.await();
+                                release.addListener(ActionListener.wrap(() -> {
                                     logger.info("request {} released for index {}", requestId, index);
-                                } catch (final InterruptedException e) {
-                                    throw new AssertionError(e);
-                                }
+                                    try {
+                                        connection.sendRequest(requestId, action, request, options);
+                                    } catch (IOException e) {
+                                        throw new AssertionError(e);
+                                    }
+                                }));
+                                return;
                             }
                         }
-
                     }
                     connection.sendRequest(requestId, action, request, options);
                 }
             );
         }
-        return Releasables.releaseOnce(release::countDown);
+        return Releasables.releaseOnce(() -> release.onResponse(null));
     }
 
     private static void assertIndexIsBlocked(final String... indices) {