|  | @@ -12,70 +12,157 @@ import org.elasticsearch.TransportVersions;
 | 
	
		
			
				|  |  |  import org.elasticsearch.Version;
 | 
	
		
			
				|  |  |  import org.elasticsearch.client.Request;
 | 
	
		
			
				|  |  |  import org.elasticsearch.common.util.Maps;
 | 
	
		
			
				|  |  | +import org.elasticsearch.test.rest.ObjectPath;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -import java.io.IOException;
 | 
	
		
			
				|  |  | -import java.util.List;
 | 
	
		
			
				|  |  |  import java.util.Map;
 | 
	
		
			
				|  |  | -import java.util.stream.Collectors;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import static org.hamcrest.Matchers.equalTo;
 | 
	
		
			
				|  |  |  import static org.hamcrest.Matchers.everyItem;
 | 
	
		
			
				|  |  | +import static org.hamcrest.Matchers.greaterThan;
 | 
	
		
			
				|  |  | +import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 | 
	
		
			
				|  |  | +import static org.hamcrest.Matchers.lessThan;
 | 
	
		
			
				|  |  | +import static org.hamcrest.Matchers.oneOf;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  public class TransportVersionClusterStateUpgradeIT extends AbstractUpgradeTestCase {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    public void testReadsInferredTransportVersions() throws IOException {
 | 
	
		
			
				|  |  | -        assumeTrue("TransportVersion introduced in 8.8.0", UPGRADE_FROM_VERSION.before(Version.V_8_8_0));
 | 
	
		
			
				|  |  | -        assumeTrue(
 | 
	
		
			
				|  |  | -            "This only has visible effects when upgrading beyond 8.8.0",
 | 
	
		
			
				|  |  | -            TransportVersion.current().after(TransportVersions.V_8_8_0)
 | 
	
		
			
				|  |  | -        );
 | 
	
		
			
				|  |  | -        assumeTrue("Only runs on the mixed cluster", CLUSTER_TYPE == ClusterType.MIXED);
 | 
	
		
			
				|  |  | -        // if the master is not upgraded, and the secondary node is, then the cluster info from the secondary
 | 
	
		
			
				|  |  | -        // should have inferred transport versions in it
 | 
	
		
			
				|  |  | -        // rely on randomisation to hit this case at some point in testing
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        Request request = new Request("GET", "/_cluster/state/nodes");
 | 
	
		
			
				|  |  | -        Map<String, Object> masterResponse = entityAsMap(client().performRequest(request));
 | 
	
		
			
				|  |  | -        assumeFalse("Master needs to not know about transport versions", masterResponse.containsKey("transport_versions"));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        request = new Request("GET", "/_cluster/state/nodes?local=true");
 | 
	
		
			
				|  |  | -        Map<String, Object> localResponse = entityAsMap(client().performRequest(request));
 | 
	
		
			
				|  |  | -        // should either be empty, or using inferred versions
 | 
	
		
			
				|  |  | -        assumeTrue("Local node needs to know about transport versions", masterResponse.containsKey("transport_versions"));
 | 
	
		
			
				|  |  | -        Map<?, Version> vs = Maps.transformValues(
 | 
	
		
			
				|  |  | -            ((Map<?, ?>) localResponse.get("nodes")),
 | 
	
		
			
				|  |  | -            v -> Version.fromString(((Map<?, ?>) v).get("version").toString())
 | 
	
		
			
				|  |  | -        );
 | 
	
		
			
				|  |  | -        Map<?, TransportVersion> tvs = ((List<?>) localResponse.get("transport_versions")).stream()
 | 
	
		
			
				|  |  | -            .map(o -> (Map<?, ?>) o)
 | 
	
		
			
				|  |  | -            .collect(Collectors.toMap(m -> m.get("node_id"), m -> TransportVersion.fromString(m.get("transport_version").toString())));
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -        for (var ver : vs.entrySet()) {
 | 
	
		
			
				|  |  | -            if (ver.getValue().after(Version.V_8_8_0)) {
 | 
	
		
			
				|  |  | -                assertThat(
 | 
	
		
			
				|  |  | -                    "Node " + ver.getKey() + " should have an inferred transport version",
 | 
	
		
			
				|  |  | -                    tvs.get(ver.getKey()),
 | 
	
		
			
				|  |  | -                    equalTo(TransportVersions.V_8_8_0)
 | 
	
		
			
				|  |  | -                );
 | 
	
		
			
				|  |  | +    private static final Version VERSION_INTRODUCING_TRANSPORT_VERSIONS = Version.V_8_8_0;
 | 
	
		
			
				|  |  | +    private static final Version VERSION_INTRODUCING_NODES_VERSIONS = Version.V_8_11_0;
 | 
	
		
			
				|  |  | +    private static final TransportVersion FIRST_TRANSPORT_VERSION = TransportVersions.V_8_8_0;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    public void testReadsInferredTransportVersions() throws Exception {
 | 
	
		
			
				|  |  | +        assertEquals(VERSION_INTRODUCING_TRANSPORT_VERSIONS.id(), FIRST_TRANSPORT_VERSION.id());
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // waitUntil because the versions fixup on upgrade happens in the background so may need a retry
 | 
	
		
			
				|  |  | +        assertTrue(waitUntil(() -> {
 | 
	
		
			
				|  |  | +            try {
 | 
	
		
			
				|  |  | +                // check several responses in order to sample from a selection of nodes
 | 
	
		
			
				|  |  | +                for (int i = getClusterHosts().size(); i > 0; i--) {
 | 
	
		
			
				|  |  | +                    if (runTransportVersionsTest() == false) {
 | 
	
		
			
				|  |  | +                        return false;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                return true;
 | 
	
		
			
				|  |  | +            } catch (Exception e) {
 | 
	
		
			
				|  |  | +                throw new AssertionError(e);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -        }
 | 
	
		
			
				|  |  | +        }));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    public void testCompletesRealTransportVersions() throws IOException {
 | 
	
		
			
				|  |  | -        assumeTrue("TransportVersion introduced in 8.8.0", UPGRADE_FROM_VERSION.before(Version.V_8_8_0));
 | 
	
		
			
				|  |  | -        assumeTrue(
 | 
	
		
			
				|  |  | -            "This only has visible effects when upgrading beyond 8.8.0",
 | 
	
		
			
				|  |  | -            TransportVersion.current().after(TransportVersions.V_8_8_0)
 | 
	
		
			
				|  |  | +    private boolean runTransportVersionsTest() throws Exception {
 | 
	
		
			
				|  |  | +        final var clusterState = ObjectPath.createFromResponse(
 | 
	
		
			
				|  |  | +            client().performRequest(new Request("GET", "/_cluster/state" + randomFrom("", "/nodes") + randomFrom("", "?local")))
 | 
	
		
			
				|  |  |          );
 | 
	
		
			
				|  |  | -        assumeTrue("Only runs on the upgraded cluster", CLUSTER_TYPE == ClusterType.UPGRADED);
 | 
	
		
			
				|  |  | -        // once everything is upgraded, the master should fill in the real transport versions
 | 
	
		
			
				|  |  | +        final var description = clusterState.toString();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final var nodeIds = clusterState.evaluateMapKeys("nodes");
 | 
	
		
			
				|  |  | +        final Map<String, Version> versionsByNodeId = Maps.newHashMapWithExpectedSize(nodeIds.size());
 | 
	
		
			
				|  |  | +        for (final var nodeId : nodeIds) {
 | 
	
		
			
				|  |  | +            versionsByNodeId.put(nodeId, Version.fromString(clusterState.evaluate("nodes." + nodeId + ".version")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        final var hasTransportVersions = clusterState.evaluate("transport_versions") != null;
 | 
	
		
			
				|  |  | +        final var hasNodesVersions = clusterState.evaluate("nodes_versions") != null;
 | 
	
		
			
				|  |  | +        assertFalse(description, hasNodesVersions && hasTransportVersions);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        switch (CLUSTER_TYPE) {
 | 
	
		
			
				|  |  | +            case OLD -> {
 | 
	
		
			
				|  |  | +                if (UPGRADE_FROM_VERSION.before(VERSION_INTRODUCING_TRANSPORT_VERSIONS)) {
 | 
	
		
			
				|  |  | +                    // Before 8.8.0 there was only DiscoveryNode#version
 | 
	
		
			
				|  |  | +                    assertFalse(description, hasTransportVersions);
 | 
	
		
			
				|  |  | +                    assertFalse(description, hasNodesVersions);
 | 
	
		
			
				|  |  | +                } else if (UPGRADE_FROM_VERSION.before(VERSION_INTRODUCING_NODES_VERSIONS)) {
 | 
	
		
			
				|  |  | +                    // In [8.8.0, 8.11.0) we exposed just transport_versions
 | 
	
		
			
				|  |  | +                    assertTrue(description, hasTransportVersions);
 | 
	
		
			
				|  |  | +                    assertFalse(description, hasNodesVersions);
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    // From 8.11.0 onwards we exposed nodes_versions
 | 
	
		
			
				|  |  | +                    assertFalse(description, hasTransportVersions);
 | 
	
		
			
				|  |  | +                    assertTrue(description, hasNodesVersions);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            case MIXED -> {
 | 
	
		
			
				|  |  | +                if (UPGRADE_FROM_VERSION.before(VERSION_INTRODUCING_TRANSPORT_VERSIONS)) {
 | 
	
		
			
				|  |  | +                    // Responding node might be <8.8.0 (so no extra versions) or >=8.11.0 (includes nodes_versions)
 | 
	
		
			
				|  |  | +                    assertFalse(description, hasTransportVersions);
 | 
	
		
			
				|  |  | +                } else if (UPGRADE_FROM_VERSION.before(VERSION_INTRODUCING_NODES_VERSIONS)) {
 | 
	
		
			
				|  |  | +                    // Responding node might be in [8.8.0, 8.11.0) (transport_versions) or >=8.11.0 (includes nodes_versions) but not both
 | 
	
		
			
				|  |  | +                    assertTrue(description, hasNodesVersions || hasTransportVersions);
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    // Responding node is ≥8.11.0 so has nodes_versions for sure
 | 
	
		
			
				|  |  | +                    assertFalse(description, hasTransportVersions);
 | 
	
		
			
				|  |  | +                    assertTrue(description, hasNodesVersions);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            case UPGRADED -> {
 | 
	
		
			
				|  |  | +                // All nodes are Version.CURRENT, ≥8.11.0, so we definitely have nodes_versions
 | 
	
		
			
				|  |  | +                assertFalse(description, hasTransportVersions);
 | 
	
		
			
				|  |  | +                assertTrue(description, hasNodesVersions);
 | 
	
		
			
				|  |  | +                assertThat(description, versionsByNodeId.values(), everyItem(equalTo(Version.CURRENT)));
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (hasTransportVersions) {
 | 
	
		
			
				|  |  | +            // Upgrading from [8.8.0, 8.11.0) and the responding node is still on the old version
 | 
	
		
			
				|  |  | +            assertThat(description, UPGRADE_FROM_VERSION, lessThan(VERSION_INTRODUCING_NODES_VERSIONS));
 | 
	
		
			
				|  |  | +            assertThat(description, UPGRADE_FROM_VERSION, greaterThanOrEqualTo(VERSION_INTRODUCING_TRANSPORT_VERSIONS));
 | 
	
		
			
				|  |  | +            assertNotEquals(description, ClusterType.UPGRADED, CLUSTER_TYPE);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        Request request = new Request("GET", "/_cluster/state/nodes");
 | 
	
		
			
				|  |  | -        Map<String, Object> response = entityAsMap(client().performRequest(request));
 | 
	
		
			
				|  |  | -        Map<?, TransportVersion> tvs = ((List<?>) response.get("transport_versions")).stream()
 | 
	
		
			
				|  |  | -            .map(o -> (Map<?, ?>) o)
 | 
	
		
			
				|  |  | -            .collect(Collectors.toMap(m -> m.get("node_id"), m -> TransportVersion.fromString(m.get("transport_version").toString())));
 | 
	
		
			
				|  |  | +            // transport_versions includes the correct version for all nodes, no inference is needed
 | 
	
		
			
				|  |  | +            assertEquals(description, nodeIds.size(), clusterState.evaluateArraySize("transport_versions"));
 | 
	
		
			
				|  |  | +            for (int i = 0; i < nodeIds.size(); i++) {
 | 
	
		
			
				|  |  | +                final var path = "transport_versions." + i;
 | 
	
		
			
				|  |  | +                final String nodeId = clusterState.evaluate(path + ".node_id");
 | 
	
		
			
				|  |  | +                final var nodeDescription = nodeId + "/" + description;
 | 
	
		
			
				|  |  | +                final var transportVersion = TransportVersion.fromString(clusterState.evaluate(path + ".transport_version"));
 | 
	
		
			
				|  |  | +                final var nodeVersion = versionsByNodeId.get(nodeId);
 | 
	
		
			
				|  |  | +                assertNotNull(nodeDescription, nodeVersion);
 | 
	
		
			
				|  |  | +                if (nodeVersion.equals(Version.CURRENT)) {
 | 
	
		
			
				|  |  | +                    assertEquals(nodeDescription, TransportVersion.current(), transportVersion);
 | 
	
		
			
				|  |  | +                } else if (nodeVersion.after(VERSION_INTRODUCING_TRANSPORT_VERSIONS)) {
 | 
	
		
			
				|  |  | +                    assertThat(nodeDescription, transportVersion, greaterThan(FIRST_TRANSPORT_VERSION));
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    assertEquals(nodeDescription, FIRST_TRANSPORT_VERSION, transportVersion);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        } else if (hasNodesVersions) {
 | 
	
		
			
				|  |  | +            // Either upgrading from ≥8.11.0 (the responding node might be old or new), or from <8.8.0 (the responding node is new)
 | 
	
		
			
				|  |  | +            assertFalse(description, UPGRADE_FROM_VERSION.before(VERSION_INTRODUCING_NODES_VERSIONS) && CLUSTER_TYPE == ClusterType.OLD);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            // nodes_versions includes _a_ version for all nodes; it might be correct, or it might be inferred if we're upgrading from
 | 
	
		
			
				|  |  | +            // <8.8.0 and the master is still an old node or the TransportVersionsFixupListener hasn't run yet
 | 
	
		
			
				|  |  | +            assertEquals(description, nodeIds.size(), clusterState.evaluateArraySize("nodes_versions"));
 | 
	
		
			
				|  |  | +            for (int i = 0; i < nodeIds.size(); i++) {
 | 
	
		
			
				|  |  | +                final var path = "nodes_versions." + i;
 | 
	
		
			
				|  |  | +                final String nodeId = clusterState.evaluate(path + ".node_id");
 | 
	
		
			
				|  |  | +                final var nodeDescription = nodeId + "/" + description;
 | 
	
		
			
				|  |  | +                final var transportVersion = TransportVersion.fromString(clusterState.evaluate(path + ".transport_version"));
 | 
	
		
			
				|  |  | +                final var nodeVersion = versionsByNodeId.get(nodeId);
 | 
	
		
			
				|  |  | +                assertNotNull(nodeDescription, nodeVersion);
 | 
	
		
			
				|  |  | +                if (nodeVersion.equals(Version.CURRENT)) {
 | 
	
		
			
				|  |  | +                    // Either the responding node is upgraded or the upgrade is trivial; if the responding node is upgraded but the master
 | 
	
		
			
				|  |  | +                    // is not then its transport version may be temporarily inferred as 8.8.0 until TransportVersionsFixupListener runs.
 | 
	
		
			
				|  |  | +                    assertThat(
 | 
	
		
			
				|  |  | +                        nodeDescription,
 | 
	
		
			
				|  |  | +                        transportVersion,
 | 
	
		
			
				|  |  | +                        UPGRADE_FROM_VERSION.onOrAfter(VERSION_INTRODUCING_TRANSPORT_VERSIONS)
 | 
	
		
			
				|  |  | +                            ? equalTo(TransportVersion.current())
 | 
	
		
			
				|  |  | +                            : oneOf(TransportVersion.current(), FIRST_TRANSPORT_VERSION)
 | 
	
		
			
				|  |  | +                    );
 | 
	
		
			
				|  |  | +                    if (CLUSTER_TYPE == ClusterType.UPGRADED && transportVersion.equals(FIRST_TRANSPORT_VERSION)) {
 | 
	
		
			
				|  |  | +                        // TransportVersionsFixupListener should run soon, retry
 | 
	
		
			
				|  |  | +                        logger.info("{} - not fixed up yet, retrying", nodeDescription);
 | 
	
		
			
				|  |  | +                        return false;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                } else if (nodeVersion.after(VERSION_INTRODUCING_TRANSPORT_VERSIONS)) {
 | 
	
		
			
				|  |  | +                    // There's no relationship between node versions and transport versions any more, although we can be sure of this:
 | 
	
		
			
				|  |  | +                    assertThat(nodeDescription, transportVersion, greaterThan(FIRST_TRANSPORT_VERSION));
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    // Responding node is not upgraded, and no later than 8.8.0, so we infer its version correctly.
 | 
	
		
			
				|  |  | +                    assertEquals(nodeDescription, TransportVersion.fromId(nodeVersion.id()), transportVersion);
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -        assertThat(tvs + " should be updated", tvs.values(), everyItem(equalTo(TransportVersion.current())));
 | 
	
		
			
				|  |  | +        return true;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  }
 |