Просмотр исходного кода

Close connection manager on current thread in RemoteClusterConnection (#44805)

The problem is that RemoteClusterConnection closes the connection manager asynchronously, which races with the threadpool being shutdown at the end of the test.

Closes #44339
Closes #44610
Yannick Welsch 6 лет назад
Родитель
Сommit
49825cff6d

+ 22 - 12
server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

@@ -58,6 +58,15 @@ public class ConnectionManager implements Closeable {
     private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
         @Override
         protected void closeInternal() {
+            Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
+                try {
+                    IOUtils.closeWhileHandlingException(next.getValue());
+                } finally {
+                    iterator.remove();
+                }
+            }
             closeLatch.countDown();
         }
     };
@@ -249,22 +258,23 @@ public class ConnectionManager implements Closeable {
 
     @Override
     public void close() {
+        internalClose(true);
+    }
+
+    public void closeNoBlock() {
+        internalClose(false);
+    }
+
+    private void internalClose(boolean waitForPendingConnections) {
         assert Transports.assertNotTransportThread("Closing ConnectionManager");
         if (closing.compareAndSet(false, true)) {
             connectingRefCounter.decRef();
-            try {
-                closeLatch.await();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new IllegalStateException(e);
-            }
-            Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
+            if (waitForPendingConnections) {
                 try {
-                    IOUtils.closeWhileHandlingException(next.getValue());
-                } finally {
-                    iterator.remove();
+                    closeLatch.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new IllegalStateException(e);
                 }
             }
         }

+ 1 - 2
server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

@@ -332,8 +332,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
     @Override
     public void close() throws IOException {
         IOUtils.close(connectHandler);
-        // In the ConnectionManager we wait on connections being closed.
-        threadPool.generic().execute(connectionManager::close);
+        connectionManager.closeNoBlock();
     }
 
     public boolean isClosed() {

+ 1 - 9
server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

@@ -64,7 +64,6 @@ import org.elasticsearch.test.transport.StubbableConnectionManager;
 import org.elasticsearch.test.transport.StubbableTransport;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.Before;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -94,6 +93,7 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.endsWith;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.iterableWithSize;
@@ -101,7 +101,6 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.hamcrest.Matchers.startsWith;
-import static org.hamcrest.Matchers.endsWith;
 
 public class RemoteClusterConnectionTests extends ESTestCase {
 
@@ -114,13 +113,6 @@ public class RemoteClusterConnectionTests extends ESTestCase {
         ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
     }
 
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-        assumeFalse("https://github.com/elastic/elasticsearch/issues/44339", System.getProperty("os.name").contains("Win"));
-    }
-
     private MockTransportService startTransport(String id, List<DiscoveryNode> knownNodes, Version version) {
         return startTransport(id, knownNodes, version, threadPool);
     }

+ 0 - 6
x-pack/plugin/ccr/build.gradle

@@ -1,5 +1,3 @@
-import org.elasticsearch.gradle.OS
-
 evaluationDependsOn(xpackModule('core'))
 
 apply plugin: 'elasticsearch.esplugin'
@@ -24,8 +22,6 @@ task internalClusterTestNoSecurityManager(type: Test) {
     include noSecurityManagerITClasses
     systemProperty 'es.set.netty.runtime.available.processors', 'false'
     systemProperty 'tests.security.manager', 'false'
-    // Disable tests on windows https://github.com/elastic/elasticsearch/issues/44610
-    onlyIf { OS.WINDOWS.equals(OS.current()) == false }
 }
 
 // Instead we create a separate task to run the
@@ -38,8 +34,6 @@ task internalClusterTest(type: Test) {
     include '**/*IT.class'
     exclude noSecurityManagerITClasses
     systemProperty 'es.set.netty.runtime.available.processors', 'false'
-    // Disable tests on windows https://github.com/elastic/elasticsearch/issues/44610
-    onlyIf { OS.WINDOWS.equals(OS.current()) == false }
 }
 
 check.dependsOn internalClusterTest