|  | @@ -28,6 +28,7 @@ import org.elasticsearch.common.network.NetworkAddress;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.network.NetworkService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.settings.Settings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.transport.InetSocketTransportAddress;
 | 
	
		
			
				|  |  | +import org.elasticsearch.common.transport.TransportAddress;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.unit.TimeValue;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.BigArrays;
 | 
	
		
			
				|  |  |  import org.elasticsearch.discovery.zen.elect.ElectMasterService;
 | 
	
	
		
			
				|  | @@ -35,16 +36,22 @@ import org.elasticsearch.discovery.zen.ping.PingContextProvider;
 | 
	
		
			
				|  |  |  import org.elasticsearch.discovery.zen.ping.ZenPing;
 | 
	
		
			
				|  |  |  import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.test.ESTestCase;
 | 
	
		
			
				|  |  | +import org.elasticsearch.test.VersionUtils;
 | 
	
		
			
				|  |  |  import org.elasticsearch.threadpool.ThreadPool;
 | 
	
		
			
				|  |  | +import org.elasticsearch.transport.TransportConnectionListener;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.TransportService;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.TransportSettings;
 | 
	
		
			
				|  |  |  import org.elasticsearch.transport.netty.NettyTransport;
 | 
	
		
			
				|  |  | +import org.jboss.netty.util.internal.ConcurrentHashMap;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import java.net.InetSocketAddress;
 | 
	
		
			
				|  |  | +import java.util.concurrent.ConcurrentMap;
 | 
	
		
			
				|  |  | +import java.util.concurrent.atomic.AtomicInteger;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import static java.util.Collections.emptyMap;
 | 
	
		
			
				|  |  |  import static java.util.Collections.emptySet;
 | 
	
		
			
				|  |  |  import static org.hamcrest.Matchers.equalTo;
 | 
	
		
			
				|  |  | +import static org.hamcrest.Matchers.greaterThan;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  public class UnicastZenPingIT extends ESTestCase {
 | 
	
		
			
				|  |  |      public void testSimplePings() throws InterruptedException {
 | 
	
	
		
			
				|  | @@ -54,36 +61,31 @@ public class UnicastZenPingIT extends ESTestCase {
 | 
	
		
			
				|  |  |          settings = Settings.builder().put(settings).put(TransportSettings.PORT.getKey(), startPort + "-" + endPort).build();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          ThreadPool threadPool = new ThreadPool(getClass().getName());
 | 
	
		
			
				|  |  | -        ClusterName clusterName = new ClusterName("test");
 | 
	
		
			
				|  |  | +        ClusterName test = new ClusterName("test");
 | 
	
		
			
				|  |  | +        ClusterName mismatch = new ClusterName("mismatch");
 | 
	
		
			
				|  |  |          NetworkService networkService = new NetworkService(settings);
 | 
	
		
			
				|  |  |          ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
 | 
	
		
			
				|  |  | -        final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
 | 
	
		
			
				|  |  | -        transportServiceA.acceptIncomingRequests();
 | 
	
		
			
				|  |  | -        final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(),
 | 
	
		
			
				|  |  | -                emptyMap(), emptySet(), Version.CURRENT);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
 | 
	
		
			
				|  |  | -        final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
 | 
	
		
			
				|  |  | -        transportServiceB.acceptIncomingRequests();
 | 
	
		
			
				|  |  | -        final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(),
 | 
	
		
			
				|  |  | -                emptyMap(), emptySet(), Version.CURRENT);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        InetSocketTransportAddress addressB = (InetSocketTransportAddress) transportB.boundAddress().publishAddress();
 | 
	
		
			
				|  |  | +        NetworkHandle handleA = startServices(settings, threadPool, networkService, "UZP_A", test, Version.CURRENT);
 | 
	
		
			
				|  |  | +        NetworkHandle handleB = startServices(settings, threadPool, networkService, "UZP_B", test, Version.CURRENT);
 | 
	
		
			
				|  |  | +        NetworkHandle handleC = startServices(settings, threadPool, networkService, "UZP_C", new ClusterName("mismatch"), Version.CURRENT);
 | 
	
		
			
				|  |  | +        // just fake that no versions are compatible with this node
 | 
	
		
			
				|  |  | +        Version previousVersion = VersionUtils.getPreviousVersion(Version.CURRENT.minimumCompatibilityVersion());
 | 
	
		
			
				|  |  | +        Version versionD = VersionUtils.randomVersionBetween(random(), previousVersion.minimumCompatibilityVersion(), previousVersion);
 | 
	
		
			
				|  |  | +        NetworkHandle handleD = startServices(settings, threadPool, networkService, "UZP_D", test, versionD);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          Settings hostsSettings = Settings.builder().putArray("discovery.zen.ping.unicast.hosts",
 | 
	
		
			
				|  |  | -                NetworkAddress.format(new InetSocketAddress(addressA.address().getAddress(), addressA.address().getPort())),
 | 
	
		
			
				|  |  | -                NetworkAddress.format(new InetSocketAddress(addressB.address().getAddress(), addressB.address().getPort())))
 | 
	
		
			
				|  |  | +                NetworkAddress.format(new InetSocketAddress(handleA.address.address().getAddress(), handleA.address.address().getPort())),
 | 
	
		
			
				|  |  | +                NetworkAddress.format(new InetSocketAddress(handleB.address.address().getAddress(), handleB.address.address().getPort())),
 | 
	
		
			
				|  |  | +                NetworkAddress.format(new InetSocketAddress(handleC.address.address().getAddress(), handleC.address.address().getPort())),
 | 
	
		
			
				|  |  | +                NetworkAddress.format(new InetSocketAddress(handleD.address.address().getAddress(), handleD.address.address().getPort())))
 | 
	
		
			
				|  |  |                  .build();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, transportServiceA, clusterName, Version.CURRENT, electMasterService, null);
 | 
	
		
			
				|  |  | +        UnicastZenPing zenPingA = new UnicastZenPing(hostsSettings, threadPool, handleA.transportService, test, Version.CURRENT, electMasterService, null);
 | 
	
		
			
				|  |  |          zenPingA.setPingContextProvider(new PingContextProvider() {
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public DiscoveryNodes nodes() {
 | 
	
		
			
				|  |  | -                return DiscoveryNodes.builder().put(nodeA).localNodeId("UZP_A").build();
 | 
	
		
			
				|  |  | +                return DiscoveryNodes.builder().put(handleA.node).localNodeId("UZP_A").build();
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 | 
	
	
		
			
				|  | @@ -93,11 +95,11 @@ public class UnicastZenPingIT extends ESTestCase {
 | 
	
		
			
				|  |  |          });
 | 
	
		
			
				|  |  |          zenPingA.start();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, transportServiceB, clusterName, Version.CURRENT, electMasterService, null);
 | 
	
		
			
				|  |  | +        UnicastZenPing zenPingB = new UnicastZenPing(hostsSettings, threadPool, handleB.transportService, test, Version.CURRENT, electMasterService, null);
 | 
	
		
			
				|  |  |          zenPingB.setPingContextProvider(new PingContextProvider() {
 | 
	
		
			
				|  |  |              @Override
 | 
	
		
			
				|  |  |              public DiscoveryNodes nodes() {
 | 
	
		
			
				|  |  | -                return DiscoveryNodes.builder().put(nodeB).localNodeId("UZP_B").build();
 | 
	
		
			
				|  |  | +                return DiscoveryNodes.builder().put(handleB.node).localNodeId("UZP_B").build();
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              @Override
 | 
	
	
		
			
				|  | @@ -107,12 +109,41 @@ public class UnicastZenPingIT extends ESTestCase {
 | 
	
		
			
				|  |  |          });
 | 
	
		
			
				|  |  |          zenPingB.start();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        UnicastZenPing zenPingC = new UnicastZenPing(hostsSettings, threadPool, handleC.transportService, mismatch, versionD, electMasterService, null);
 | 
	
		
			
				|  |  | +        zenPingC.setPingContextProvider(new PingContextProvider() {
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public DiscoveryNodes nodes() {
 | 
	
		
			
				|  |  | +                return DiscoveryNodes.builder().put(handleC.node).localNodeId("UZP_C").build();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public boolean nodeHasJoinedClusterOnce() {
 | 
	
		
			
				|  |  | +                return false;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +        zenPingC.start();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        UnicastZenPing zenPingD = new UnicastZenPing(hostsSettings, threadPool, handleD.transportService, mismatch, Version.CURRENT, electMasterService, null);
 | 
	
		
			
				|  |  | +        zenPingD.setPingContextProvider(new PingContextProvider() {
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public DiscoveryNodes nodes() {
 | 
	
		
			
				|  |  | +                return DiscoveryNodes.builder().put(handleD.node).localNodeId("UZP_D").build();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public boolean nodeHasJoinedClusterOnce() {
 | 
	
		
			
				|  |  | +                return false;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +        zenPingD.start();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |          try {
 | 
	
		
			
				|  |  |              logger.info("ping from UZP_A");
 | 
	
		
			
				|  |  |              ZenPing.PingResponse[] pingResponses = zenPingA.pingAndWait(TimeValue.timeValueSeconds(10));
 | 
	
		
			
				|  |  |              assertThat(pingResponses.length, equalTo(1));
 | 
	
		
			
				|  |  |              assertThat(pingResponses[0].node().getId(), equalTo("UZP_B"));
 | 
	
		
			
				|  |  |              assertTrue(pingResponses[0].hasJoinedOnce());
 | 
	
		
			
				|  |  | +            assertCounters(handleA, handleA, handleB, handleC, handleD);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              // ping again, this time from B,
 | 
	
		
			
				|  |  |              logger.info("ping from UZP_B");
 | 
	
	
		
			
				|  | @@ -120,13 +151,72 @@ public class UnicastZenPingIT extends ESTestCase {
 | 
	
		
			
				|  |  |              assertThat(pingResponses.length, equalTo(1));
 | 
	
		
			
				|  |  |              assertThat(pingResponses[0].node().getId(), equalTo("UZP_A"));
 | 
	
		
			
				|  |  |              assertFalse(pingResponses[0].hasJoinedOnce());
 | 
	
		
			
				|  |  | +            assertCounters(handleB, handleA, handleB, handleC, handleD);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +            logger.info("ping from UZP_C");
 | 
	
		
			
				|  |  | +            pingResponses = zenPingC.pingAndWait(TimeValue.timeValueSeconds(10));
 | 
	
		
			
				|  |  | +            assertThat(pingResponses.length, equalTo(0));
 | 
	
		
			
				|  |  | +            assertCounters(handleC, handleA, handleB, handleC, handleD);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            logger.info("ping from UZP_D");
 | 
	
		
			
				|  |  | +            pingResponses = zenPingD.pingAndWait(TimeValue.timeValueSeconds(10));
 | 
	
		
			
				|  |  | +            assertThat(pingResponses.length, equalTo(0));
 | 
	
		
			
				|  |  | +            assertCounters(handleD, handleA, handleB, handleC, handleD);
 | 
	
		
			
				|  |  |          } finally {
 | 
	
		
			
				|  |  |              zenPingA.close();
 | 
	
		
			
				|  |  |              zenPingB.close();
 | 
	
		
			
				|  |  | -            transportServiceA.close();
 | 
	
		
			
				|  |  | -            transportServiceB.close();
 | 
	
		
			
				|  |  | +            zenPingC.close();
 | 
	
		
			
				|  |  | +            zenPingD.close();
 | 
	
		
			
				|  |  | +            handleA.transportService.close();
 | 
	
		
			
				|  |  | +            handleB.transportService.close();
 | 
	
		
			
				|  |  | +            handleC.transportService.close();
 | 
	
		
			
				|  |  | +            handleD.transportService.close();
 | 
	
		
			
				|  |  |              terminate(threadPool);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // assert that we tried to ping each of the configured nodes at least once
 | 
	
		
			
				|  |  | +    private void assertCounters(NetworkHandle that, NetworkHandle...handles) {
 | 
	
		
			
				|  |  | +        for (NetworkHandle handle : handles) {
 | 
	
		
			
				|  |  | +            if (handle != that) {
 | 
	
		
			
				|  |  | +                assertThat(that.counters.get(handle.address).get(), greaterThan(0));
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private NetworkHandle startServices(Settings settings, ThreadPool threadPool, NetworkService networkService, String nodeId, ClusterName clusterName, Version version) {
 | 
	
		
			
				|  |  | +        NettyTransport transport = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, version, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
 | 
	
		
			
				|  |  | +        final TransportService transportService = new TransportService(transport, threadPool, clusterName);
 | 
	
		
			
				|  |  | +        transportService.start();
 | 
	
		
			
				|  |  | +        transportService.acceptIncomingRequests();
 | 
	
		
			
				|  |  | +        ConcurrentMap<TransportAddress, AtomicInteger> counters = new ConcurrentHashMap<>();
 | 
	
		
			
				|  |  | +        transportService.addConnectionListener(new TransportConnectionListener() {
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void onNodeConnected(DiscoveryNode node) {
 | 
	
		
			
				|  |  | +                counters.computeIfAbsent(node.getAddress(), k -> new AtomicInteger());
 | 
	
		
			
				|  |  | +                counters.get(node.getAddress()).incrementAndGet();
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            @Override
 | 
	
		
			
				|  |  | +            public void onNodeDisconnected(DiscoveryNode node) {
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  | +        final DiscoveryNode node = new DiscoveryNode(nodeId, transportService.boundAddress().publishAddress(), emptyMap(), emptySet(), version);
 | 
	
		
			
				|  |  | +        transportService.setLocalNode(node);
 | 
	
		
			
				|  |  | +        return new NetworkHandle((InetSocketTransportAddress)transport.boundAddress().publishAddress(), transportService, node, counters);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    private static class NetworkHandle {
 | 
	
		
			
				|  |  | +        public final InetSocketTransportAddress address;
 | 
	
		
			
				|  |  | +        public final TransportService transportService;
 | 
	
		
			
				|  |  | +        public final DiscoveryNode node;
 | 
	
		
			
				|  |  | +        public final ConcurrentMap<TransportAddress, AtomicInteger> counters;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        public NetworkHandle(InetSocketTransportAddress address, TransportService transportService, DiscoveryNode discoveryNode, ConcurrentMap<TransportAddress, AtomicInteger> counters) {
 | 
	
		
			
				|  |  | +            this.address = address;
 | 
	
		
			
				|  |  | +            this.transportService = transportService;
 | 
	
		
			
				|  |  | +            this.node = discoveryNode;
 | 
	
		
			
				|  |  | +            this.counters = counters;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |