浏览代码

Fix various concurrency issues in transport (#19675)

Due to various issues (most notably a missing happens-before edge
between socket accept and channel close in MockTcpTransport),
MockTcpTransportTests sometimes did not terminate.

With this commit we fix various concurrency issues that led to
this hanging test.

Failing example build: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-os-compatibility/os=oraclelinux/835/console
Daniel Mitterdorfer 9 年之前
父节点
当前提交
4598c36027

+ 8 - 17
core/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -93,7 +93,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 
 
@@ -768,12 +767,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
         threadPool.generic().execute(() -> {
         threadPool.generic().execute(() -> {
             globalLock.writeLock().lock();
             globalLock.writeLock().lock();
             try {
             try {
-                for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
-                    NodeChannels nodeChannels = it.next();
-                    it.remove();
-                    IOUtils.closeWhileHandlingException(nodeChannels);
-                }
-
+                // first stop to accept any incoming connections so nobody can connect to this transport
                 for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
                 for (Map.Entry<String, List<Channel>> entry : serverChannels.entrySet()) {
                     try {
                     try {
                         closeChannels(entry.getValue());
                         closeChannels(entry.getValue());
@@ -781,16 +775,13 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
                         logger.debug("Error closing serverChannel for profile [{}]", e, entry.getKey());
                         logger.debug("Error closing serverChannel for profile [{}]", e, entry.getKey());
                     }
                     }
                 }
                 }
-                try {
-                    stopInternal();
-                } finally {
-                    for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
-                        NodeChannels nodeChannels = it.next();
-                        it.remove();
-                        IOUtils.closeWhileHandlingException(nodeChannels);
-                    }
-                }
 
 
+                for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext(); ) {
+                    NodeChannels nodeChannels = it.next();
+                    it.remove();
+                    IOUtils.closeWhileHandlingException(nodeChannels);
+                }
+                stopInternal();
             } finally {
             } finally {
                 globalLock.writeLock().unlock();
                 globalLock.writeLock().unlock();
                 latch.countDown();
                 latch.countDown();
@@ -800,7 +791,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
         try {
         try {
             latch.await(30, TimeUnit.SECONDS);
             latch.await(30, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {
-            Thread.interrupted();
+            Thread.currentThread().interrupt();
             // ignore
             // ignore
         }
         }
     }
     }

+ 4 - 7
test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java

@@ -69,12 +69,12 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
     protected ThreadPool threadPool;
     protected ThreadPool threadPool;
 
 
     protected static final Version version0 = Version.CURRENT.minimumCompatibilityVersion();
     protected static final Version version0 = Version.CURRENT.minimumCompatibilityVersion();
-    protected DiscoveryNode nodeA;
-    protected MockTransportService serviceA;
+    protected volatile DiscoveryNode nodeA;
+    protected volatile MockTransportService serviceA;
 
 
     protected static final Version version1 = Version.fromId(Version.CURRENT.id + 1);
     protected static final Version version1 = Version.fromId(Version.CURRENT.id + 1);
-    protected DiscoveryNode nodeB;
-    protected MockTransportService serviceB;
+    protected volatile DiscoveryNode nodeB;
+    protected volatile MockTransportService serviceB;
 
 
     protected abstract MockTransportService build(Settings settings, Version version);
     protected abstract MockTransportService build(Settings settings, Version version);
 
 
@@ -489,9 +489,6 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
         assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
         assertThat(latch.await(5, TimeUnit.SECONDS), equalTo(true));
     }
     }
 
 
-    @TestLogging("transport:DEBUG,transport.tracer:TRACE")
-    // boaz is on this
-    @AwaitsFix(bugUrl = "https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-os-compatibility/os=oraclelinux/835")
     public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierException, InterruptedException {
     public void testConcurrentSendRespondAndDisconnect() throws BrokenBarrierException, InterruptedException {
         Set<Exception> sendingErrors = ConcurrentCollections.newConcurrentSet();
         Set<Exception> sendingErrors = ConcurrentCollections.newConcurrentSet();
         Set<Exception> responseErrors = ConcurrentCollections.newConcurrentSet();
         Set<Exception> responseErrors = ConcurrentCollections.newConcurrentSet();

+ 52 - 17
test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java

@@ -122,7 +122,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
         try {
         try {
             started.await();
             started.await();
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {
-            Thread.interrupted();
+            Thread.currentThread().interrupt();
         }
         }
         return serverMockChannel;
         return serverMockChannel;
     }
     }
@@ -261,6 +261,14 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
         private final CancellableThreads cancellableThreads = new CancellableThreads();
         private final CancellableThreads cancellableThreads = new CancellableThreads();
         private final Closeable onClose;
         private final Closeable onClose;
 
 
+        /**
+         * Constructs a new MockChannel instance intended for handling the actual incoming / outgoing traffic.
+         *
+         * @param socket The client socket. Mut not be null.
+         * @param localAddress Address associated with the corresponding local server socket. Must not be null.
+         * @param profile The associated profile name.
+         * @param onClose Callback to execute when this channel is closed.
+         */
         public MockChannel(Socket socket, InetSocketAddress localAddress, String profile, Consumer<MockChannel> onClose) {
         public MockChannel(Socket socket, InetSocketAddress localAddress, String profile, Consumer<MockChannel> onClose) {
             this.localAddress = localAddress;
             this.localAddress = localAddress;
             this.activeChannel = socket;
             this.activeChannel = socket;
@@ -268,13 +276,44 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
             this.profile = profile;
             this.profile = profile;
             this.onClose = () -> onClose.accept(this);
             this.onClose = () -> onClose.accept(this);
         }
         }
+
+        /**
+         * Constructs a new MockChannel instance intended for accepting requests.
+         *
+         * @param serverSocket The associated server socket. Must not be null.
+         * @param profile The associated profile name.
+         */
+        public MockChannel(ServerSocket serverSocket, String profile) {
+            this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
+            this.serverSocket = serverSocket;
+            this.profile = profile;
+            this.activeChannel = null;
+            this.onClose = null;
+        }
+
         public void accept(Executor executor) throws IOException {
         public void accept(Executor executor) throws IOException {
             while (isOpen.get()) {
             while (isOpen.get()) {
-                Socket accept = serverSocket.accept();
-                configureSocket(accept);
-                MockChannel mockChannel = new MockChannel(accept, localAddress, profile, workerChannels::remove);
-                workerChannels.put(mockChannel, Boolean.TRUE);
-                mockChannel.loopRead(executor);
+                Socket incomingSocket = serverSocket.accept();
+                MockChannel incomingChannel = null;
+                try {
+                    configureSocket(incomingSocket);
+                    incomingChannel = new MockChannel(incomingSocket, localAddress, profile, workerChannels::remove);
+                    //establish a happens-before edge between closing and accepting a new connection
+                    synchronized (this) {
+                        if (isOpen.get()) {
+                            workerChannels.put(incomingChannel, Boolean.TRUE);
+                            // this spawns a new thread immediately, so OK under lock
+                            incomingChannel.loopRead(executor);
+                            // the channel is properly registered and will be cleared by the close code.
+                            incomingSocket = null;
+                            incomingChannel = null;
+                        }
+                    }
+                } finally {
+                    // ensure we don't leak sockets and channels in the failure case. Note that we null both
+                    // if there are no exceptions so this becomes a no op.
+                    IOUtils.closeWhileHandlingException(incomingSocket, incomingChannel);
+                }
             }
             }
         }
         }
 
 
@@ -294,26 +333,22 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
                 @Override
                 @Override
                 protected void doRun() throws Exception {
                 protected void doRun() throws Exception {
                     StreamInput input = new InputStreamStreamInput(new BufferedInputStream(activeChannel.getInputStream()));
                     StreamInput input = new InputStreamStreamInput(new BufferedInputStream(activeChannel.getInputStream()));
-                    while (isOpen.get()) {
+                    // There is a (slim) chance that we get interrupted right after a loop iteration, so check explicitly
+                    while (isOpen.get() && !Thread.currentThread().isInterrupted()) {
                         cancellableThreads.executeIO(() -> readMessage(MockChannel.this, input));
                         cancellableThreads.executeIO(() -> readMessage(MockChannel.this, input));
                     }
                     }
                 }
                 }
             });
             });
         }
         }
 
 
-        public MockChannel(ServerSocket serverSocket, String profile) {
-            this.localAddress = (InetSocketAddress) serverSocket.getLocalSocketAddress();
-            this.serverSocket = serverSocket;
-            this.profile = profile;
-            this.activeChannel = null;
-            this.onClose = null;
-        }
-
         @Override
         @Override
         public void close() throws IOException {
         public void close() throws IOException {
             if (isOpen.compareAndSet(true, false)) {
             if (isOpen.compareAndSet(true, false)) {
-                IOUtils.close( () -> cancellableThreads.cancel("channel closed"), serverSocket, activeChannel,
-                    () -> IOUtils.close(workerChannels.keySet()), onClose);
+                //establish a happens-before edge between closing and accepting a new connection
+                synchronized (this) {
+                    IOUtils.close(serverSocket, activeChannel, () -> IOUtils.close(workerChannels.keySet()),
+                        () -> cancellableThreads.cancel("channel closed"), onClose);
+                }
             }
             }
         }
         }
     }
     }