Browse Source

TransportService should capture listener before spawning background notification task

Not doing this made it difficult to establish a happens before relationship between connecting to a node and adding a listeners. Causing test code like this to fail sproadically:

```
        // connection to reuse
        handleA.transportService.connectToNode(handleB.node);

        // install a listener to check that no new connections are made
        handleA.transportService.addConnectionListener(new TransportConnectionListener() {
            @Override
            public void onConnectionOpened(DiscoveryNode node) {
                fail("should not open any connections. got [" + node + "]");
            }
        });

```

relates to #22277
Boaz Leskes 8 years ago
parent
commit
c2baa5f213

+ 11 - 10
core/src/main/java/org/elasticsearch/transport/TransportService.java

@@ -64,6 +64,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import static java.util.Collections.emptyList;
 import static org.elasticsearch.common.settings.Setting.listSetting;
@@ -807,20 +808,20 @@ public class TransportService extends AbstractLifecycleComponent {
 
         @Override
         public void onNodeConnected(final DiscoveryNode node) {
-            threadPool.generic().execute(() -> {
-                for (TransportConnectionListener connectionListener : connectionListeners) {
-                    connectionListener.onNodeConnected(node);
-                }
-            });
+            // capture listeners before spawning the background callback so the following pattern won't trigger a call
+            // connectToNode(); connection is completed successfully
+            // addConnectionListener(); this listener shouldn't be called
+            final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
+            threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onNodeConnected(node)));
         }
 
         @Override
         public void onConnectionOpened(DiscoveryNode node) {
-            threadPool.generic().execute(() -> {
-                for (TransportConnectionListener connectionListener : connectionListeners) {
-                    connectionListener.onConnectionOpened(node);
-                }
-            });
+            // capture listeners before spawning the background callback so the following pattern won't trigger a call
+            // connectToNode(); connection is completed successfully
+            // addConnectionListener(); this listener shouldn't be called
+            final Stream<TransportConnectionListener> listenersToNotify = TransportService.this.connectionListeners.stream();
+            threadPool.generic().execute(() -> listenersToNotify.forEach(listener -> listener.onConnectionOpened(node)));
         }
 
         @Override

+ 0 - 2
core/src/test/java/org/elasticsearch/discovery/zen/UnicastZenPingTests.java

@@ -41,7 +41,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.transport.MockTransportService;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -540,7 +539,6 @@ public class UnicastZenPingTests extends ESTestCase {
         }
     }
 
-    @TestLogging("org.elasticsearch:DEBUG,org.elasticsearch.discovery:TRACE,org.elasticsearch.transport:TRACE")
     public void testResolveReuseExistingNodeConnections() throws ExecutionException, InterruptedException {
         final Settings settings = Settings.builder().put("cluster.name", "test").put(TransportSettings.PORT.getKey(), 0).build();