Browse Source

[8.18] Assumed cluster features infrastructure (#118143) (#118848)

Backport #118143 to 8.18
Simon Cooper 10 months ago
parent
commit
51fd70d349

+ 1 - 1
build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt

@@ -162,7 +162,7 @@ org.elasticsearch.cluster.ClusterFeatures#nodeFeatures()
 @defaultMessage ClusterFeatures#allNodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
 org.elasticsearch.cluster.ClusterFeatures#allNodeFeatures()
 @defaultMessage ClusterFeatures#clusterHasFeature is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
-org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.features.NodeFeature)
+org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cluster.node.DiscoveryNodes, org.elasticsearch.features.NodeFeature)
 
 @defaultMessage Do not construct this records outside the source files they are declared in
 org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)

+ 5 - 0
docs/changelog/118143.yaml

@@ -0,0 +1,5 @@
+pr: 118143
+summary: Infrastructure for assuming cluster features in the next major version
+area: "Infra/Core"
+type: feature
+issues: []

+ 44 - 2
server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java

@@ -9,6 +9,8 @@
 
 package org.elasticsearch.cluster;
 
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ChunkedToXContent;
@@ -92,6 +94,22 @@ public class ClusterFeatures implements Diffable<ClusterFeatures>, ChunkedToXCon
         return allNodeFeatures;
     }
 
+    /**
+     * Returns {@code true} if {@code node} can have assumed features.
+     * @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
+     */
+    public static boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
+        return node.getBuildVersion().canRemoveAssumedFeatures();
+    }
+
+    /**
+     * Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
+     * @see org.elasticsearch.env.BuildVersion#canRemoveAssumedFeatures
+     */
+    public static boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
+        return nodes.getAllNodes().stream().anyMatch(n -> n.getBuildVersion().canRemoveAssumedFeatures());
+    }
+
     /**
      * {@code true} if {@code feature} is present on all nodes in the cluster.
      * <p>
@@ -99,8 +117,32 @@ public class ClusterFeatures implements Diffable<ClusterFeatures>, ChunkedToXCon
      * Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
      */
     @SuppressForbidden(reason = "directly reading cluster features")
-    public boolean clusterHasFeature(NodeFeature feature) {
-        return allNodeFeatures().contains(feature.id());
+    public boolean clusterHasFeature(DiscoveryNodes nodes, NodeFeature feature) {
+        assert nodes.getNodes().keySet().equals(nodeFeatures.keySet())
+            : "Cluster features nodes " + nodeFeatures.keySet() + " is different to discovery nodes " + nodes.getNodes().keySet();
+
+        // basic case
+        boolean allNodesHaveFeature = allNodeFeatures().contains(feature.id());
+        if (allNodesHaveFeature) {
+            return true;
+        }
+
+        // if the feature is assumed, check the versions more closely
+        // it's actually ok if the feature is assumed, and all nodes missing the feature can assume it
+        // TODO: do we need some kind of transient cache of this calculation?
+        if (feature.assumedAfterNextCompatibilityBoundary()) {
+            for (var nf : nodeFeatures.entrySet()) {
+                if (nf.getValue().contains(feature.id()) == false
+                    && featuresCanBeAssumedForNode(nodes.getNodes().get(nf.getKey())) == false) {
+                    return false;
+                }
+            }
+
+            // all nodes missing the feature can assume it - so that's alright then
+            return true;
+        }
+
+        return false;
     }
 
     /**

+ 73 - 8
server/src/main/java/org/elasticsearch/cluster/coordination/NodeJoinExecutor.java

@@ -29,6 +29,7 @@ import org.elasticsearch.cluster.version.CompatibilityVersions;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.features.FeatureService;
+import org.elasticsearch.features.NodeFeature;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
@@ -39,6 +40,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -137,8 +139,8 @@ public class NodeJoinExecutor implements ClusterStateTaskExecutor<JoinTask> {
 
         DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
         Map<String, CompatibilityVersions> compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions());
-        Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures());
-        Set<String> allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
+        Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures()); // as present in cluster state
+        Set<String> effectiveClusterFeatures = calculateEffectiveClusterFeatures(newState.nodes(), nodeFeatures);
 
         assert nodesBuilder.isLocalNodeElectedMaster();
 
@@ -174,14 +176,17 @@ public class NodeJoinExecutor implements ClusterStateTaskExecutor<JoinTask> {
                         }
                         blockForbiddenVersions(compatibilityVersions.transportVersion());
                         ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
-                        enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features);
+                        Set<String> newNodeEffectiveFeatures = enforceNodeFeatureBarrier(node, effectiveClusterFeatures, features);
                         // we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
                         // we have to reject nodes that don't support all indices we have in this cluster
                         ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata());
+
                         nodesBuilder.add(node);
                         compatibilityVersionsMap.put(node.getId(), compatibilityVersions);
+                        // store the actual node features here, not including assumed features, as this is persisted in cluster state
                         nodeFeatures.put(node.getId(), features);
-                        allNodesFeatures.retainAll(features);
+                        effectiveClusterFeatures.retainAll(newNodeEffectiveFeatures);
+
                         nodesChanged = true;
                         minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
                         maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
@@ -355,6 +360,35 @@ public class NodeJoinExecutor implements ClusterStateTaskExecutor<JoinTask> {
         }
     }
 
+    /**
+     * Calculate the cluster's effective features. This includes all features that are assumed on any nodes in the cluster,
+     * that are also present across the whole cluster as a result.
+     */
+    private Set<String> calculateEffectiveClusterFeatures(DiscoveryNodes nodes, Map<String, Set<String>> nodeFeatures) {
+        if (featureService.featuresCanBeAssumedForNodes(nodes)) {
+            Set<String> assumedFeatures = featureService.getNodeFeatures()
+                .values()
+                .stream()
+                .filter(NodeFeature::assumedAfterNextCompatibilityBoundary)
+                .map(NodeFeature::id)
+                .collect(Collectors.toSet());
+
+            // add all assumed features to the featureset of all nodes of the next major version
+            nodeFeatures = new HashMap<>(nodeFeatures);
+            for (var node : nodes.getNodes().entrySet()) {
+                if (featureService.featuresCanBeAssumedForNode(node.getValue())) {
+                    assert nodeFeatures.containsKey(node.getKey()) : "Node " + node.getKey() + " does not have any features";
+                    nodeFeatures.computeIfPresent(node.getKey(), (k, v) -> {
+                        var newFeatures = new HashSet<>(v);
+                        return newFeatures.addAll(assumedFeatures) ? newFeatures : v;
+                    });
+                }
+            }
+        }
+
+        return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
+    }
+
     /**
      * Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata
      * will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
@@ -461,13 +495,44 @@ public class NodeJoinExecutor implements ClusterStateTaskExecutor<JoinTask> {
         }
     }
 
-    private void enforceNodeFeatureBarrier(String nodeId, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) {
+    /**
+     * Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster
+     *
+     * @return The set of features that this node has (including assumed features)
+     */
+    private Set<String> enforceNodeFeatureBarrier(DiscoveryNode node, Set<String> effectiveClusterFeatures, Set<String> newNodeFeatures) {
         // prevent join if it does not have one or more features that all other nodes have
-        Set<String> missingFeatures = new HashSet<>(existingNodesFeatures);
+        Set<String> missingFeatures = new HashSet<>(effectiveClusterFeatures);
         missingFeatures.removeAll(newNodeFeatures);
 
-        if (missingFeatures.isEmpty() == false) {
-            throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures);
+        if (missingFeatures.isEmpty()) {
+            // nothing missing - all ok
+            return newNodeFeatures;
+        }
+
+        if (featureService.featuresCanBeAssumedForNode(node)) {
+            // it might still be ok for this node to join if this node can have assumed features,
+            // and all the missing features are assumed
+            // we can get the NodeFeature object direct from this node's registered features
+            // as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one
+            newNodeFeatures = new HashSet<>(newNodeFeatures);
+            for (Iterator<String> it = missingFeatures.iterator(); it.hasNext();) {
+                String feature = it.next();
+                NodeFeature nf = featureService.getNodeFeatures().get(feature);
+                if (nf.assumedAfterNextCompatibilityBoundary()) {
+                    // its ok for this feature to be missing from this node
+                    it.remove();
+                    // and it should be assumed to still be in the cluster
+                    newNodeFeatures.add(feature);
+                }
+                // even if we don't remove it, still continue, so the exception message below is accurate
+            }
+        }
+
+        if (missingFeatures.isEmpty()) {
+            return newNodeFeatures;
+        } else {
+            throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures);
         }
     }
 

+ 5 - 0
server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.common.util.StringLiteralDeduplicator;
 import org.elasticsearch.core.Nullable;
+import org.elasticsearch.env.BuildVersion;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
 import org.elasticsearch.node.Node;
@@ -503,6 +504,10 @@ public class DiscoveryNode implements Writeable, ToXContentFragment {
         return this.versionInfo.nodeVersion();
     }
 
+    public BuildVersion getBuildVersion() {
+        return BuildVersion.fromVersionId(getVersion().id);
+    }
+
     public OptionalInt getPre811VersionId() {
         // Even if Version is removed from this class completely it will need to read the version ID
         // off the wire for old node versions, so the value of this variable can be obtained from that

+ 6 - 0
server/src/main/java/org/elasticsearch/env/BuildVersion.java

@@ -33,6 +33,12 @@ import java.util.ServiceLoader;
  */
 public abstract class BuildVersion {
 
+    /**
+     * Checks if this version can operate properly in a cluster without features
+     * that are assumed in the currently running Elasticsearch.
+     */
+    public abstract boolean canRemoveAssumedFeatures();
+
     /**
      * Check whether this version is on or after a minimum threshold.
      *

+ 11 - 0
server/src/main/java/org/elasticsearch/env/DefaultBuildVersion.java

@@ -37,6 +37,17 @@ final class DefaultBuildVersion extends BuildVersion {
         this.version = Version.fromId(versionId);
     }
 
+    @Override
+    public boolean canRemoveAssumedFeatures() {
+        /*
+         * We can remove assumed features if the node version is the next major version.
+         * This is because the next major version can only form a cluster with the
+         * latest minor version of the previous major, so any features introduced before that point
+         * (that are marked as assumed in the running code version) are automatically met by that version.
+         */
+        return version.major == Version.CURRENT.major + 1;
+    }
+
     @Override
     public boolean onOrAfterMinimumCompatible() {
         return Version.CURRENT.minimumCompatibilityVersion().onOrBefore(version);

+ 18 - 2
server/src/main/java/org/elasticsearch/features/FeatureService.java

@@ -10,7 +10,10 @@
 package org.elasticsearch.features;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterFeatures;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
@@ -44,7 +47,6 @@ public class FeatureService {
      * as the local node's supported feature set
      */
     public FeatureService(List<? extends FeatureSpecification> specs) {
-
         var featureData = FeatureData.createFromSpecifications(specs);
         nodeFeatures = featureData.getNodeFeatures();
         historicalFeatures = featureData.getHistoricalFeatures();
@@ -60,12 +62,26 @@ public class FeatureService {
         return nodeFeatures;
     }
 
+    /**
+     * Returns {@code true} if {@code node} can have assumed features.
+     */
+    public boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
+        return ClusterFeatures.featuresCanBeAssumedForNode(node);
+    }
+
+    /**
+    * Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
+    */
+    public boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
+        return ClusterFeatures.featuresCanBeAssumedForNodes(nodes);
+    }
+
     /**
      * Returns {@code true} if all nodes in {@code state} support feature {@code feature}.
      */
     @SuppressForbidden(reason = "We need basic feature information from cluster state")
     public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
-        if (state.clusterFeatures().clusterHasFeature(feature)) {
+        if (state.clusterFeatures().clusterHasFeature(state.getNodes(), feature)) {
             return true;
         }
 

+ 8 - 1
server/src/main/java/org/elasticsearch/features/NodeFeature.java

@@ -15,10 +15,17 @@ import java.util.Objects;
  * A feature published by a node.
  *
  * @param id        The feature id. Must be unique in the node.
+ * @param assumedAfterNextCompatibilityBoundary
+ *              {@code true} if this feature is removed at the next compatibility boundary (ie next major version),
+ *              and so should be assumed to be true for all nodes after that boundary.
  */
-public record NodeFeature(String id) {
+public record NodeFeature(String id, boolean assumedAfterNextCompatibilityBoundary) {
 
     public NodeFeature {
         Objects.requireNonNull(id);
     }
+
+    public NodeFeature(String id) {
+        this(id, false);
+    }
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/readiness/ReadinessService.java

@@ -294,8 +294,8 @@ public class ReadinessService extends AbstractLifecycleComponent implements Clus
     }
 
     @SuppressForbidden(reason = "need to check file settings support on exact cluster state")
-    private static boolean supportsFileSettings(ClusterState clusterState) {
-        return clusterState.clusterFeatures().clusterHasFeature(FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
+    private boolean supportsFileSettings(ClusterState clusterState) {
+        return clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
     }
 
     private void setReady(boolean ready) {

+ 21 - 8
server/src/test/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingServiceTests.java

@@ -19,6 +19,8 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadataStats;
 import org.elasticsearch.cluster.metadata.IndexWriteLoad;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
@@ -110,6 +112,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
         );
         builder.put(dataStream);
         ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
             .nodeFeatures(
                 Map.of(
                     "n1",
@@ -143,8 +146,9 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             // cluster doesn't have feature
             ClusterState stateNoFeature = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder()).build();
 
+            Settings settings = Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build();
             DataStreamAutoShardingService noFeatureService = new DataStreamAutoShardingService(
-                Settings.builder().put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true).build(),
+                settings,
                 clusterService,
                 new FeatureService(List.of()),
                 () -> now
@@ -155,15 +159,16 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
         }
 
         {
+            Settings settings = Settings.builder()
+                .put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true)
+                .putList(
+                    DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey(),
+                    List.of("foo", dataStreamName + "*")
+                )
+                .build();
             // patterns are configured to exclude the current data stream
             DataStreamAutoShardingService noFeatureService = new DataStreamAutoShardingService(
-                Settings.builder()
-                    .put(DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED, true)
-                    .putList(
-                        DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_EXCLUDES_SETTING.getKey(),
-                        List.of("foo", dataStreamName + "*")
-                    )
-                    .build(),
+                settings,
                 clusterService,
                 new FeatureService(List.of()),
                 () -> now
@@ -199,6 +204,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             DataStream dataStream = dataStreamSupplier.apply(null);
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",
@@ -237,6 +243,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             );
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",
@@ -275,6 +282,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             );
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",
@@ -313,6 +321,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             DataStream dataStream = dataStreamSupplier.apply(null);
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",
@@ -353,6 +362,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             DataStream dataStream = dataStreamSupplier.apply(null);
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",
@@ -401,6 +411,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             );
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",
@@ -447,6 +458,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             );
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",
@@ -487,6 +499,7 @@ public class DataStreamAutoShardingServiceTests extends ESTestCase {
             DataStream dataStream = dataStreamSupplier.apply(null);
             builder.put(dataStream);
             ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("n1")).add(DiscoveryNodeUtils.create("n2")))
                 .nodeFeatures(
                     Map.of(
                         "n1",

+ 228 - 4
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinExecutorTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.features.FeatureService;
 import org.elasticsearch.features.FeatureSpecification;
 import org.elasticsearch.features.NodeFeature;
@@ -46,11 +47,13 @@ import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
 import static org.elasticsearch.cluster.metadata.DesiredNodesTestCase.assertDesiredNodesStatusIsCorrect;
@@ -227,6 +230,227 @@ public class NodeJoinExecutorTests extends ESTestCase {
         );
     }
 
+    @SuppressForbidden(reason = "we need to actually check what is in cluster state")
+    private static Map<String, Set<String>> getRecordedNodeFeatures(ClusterState state) {
+        return state.clusterFeatures().nodeFeatures();
+    }
+
+    private static Version nextMajor() {
+        return Version.fromId((Version.CURRENT.major + 1) * 1_000_000 + 99);
+    }
+
+    public void testCanJoinClusterWithAssumedFeatures() throws Exception {
+        AllocationService allocationService = createAllocationService();
+        RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
+        FeatureService featureService = new FeatureService(List.of(new FeatureSpecification() {
+            @Override
+            public Set<NodeFeature> getFeatures() {
+                return Set.of(new NodeFeature("f1"), new NodeFeature("af1", true), new NodeFeature("af2", true));
+            }
+        }));
+
+        NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService, featureService);
+
+        DiscoveryNode masterNode = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        DiscoveryNode otherNode = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        Map<String, Set<String>> features = new HashMap<>();
+        features.put(masterNode.getId(), Set.of("f1", "af1", "af2"));
+        features.put(otherNode.getId(), Set.of("f1", "af1", "af2"));
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).add(otherNode))
+            .nodeFeatures(features)
+            .build();
+
+        // it is valid for major+1 versions to join clusters assumed features still present
+        // this can happen in the process of marking, then removing, assumed features
+        // they should still be recorded appropriately
+        DiscoveryNode newNode = DiscoveryNodeUtils.builder(UUIDs.base64UUID())
+            .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current())
+            .build();
+        clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
+            clusterState,
+            executor,
+            List.of(
+                JoinTask.singleNode(
+                    newNode,
+                    CompatibilityVersionsUtils.staticCurrent(),
+                    Set.of("f1", "af2"),
+                    TEST_REASON,
+                    NO_FAILURE_LISTENER,
+                    0L
+                )
+            )
+        );
+        features.put(newNode.getId(), Set.of("f1", "af2"));
+
+        // extra final check that the recorded cluster features are as they should be
+        assertThat(getRecordedNodeFeatures(clusterState), equalTo(features));
+    }
+
+    public void testJoinClusterWithAssumedFeaturesDoesntAllowNonAssumed() throws Exception {
+        AllocationService allocationService = createAllocationService();
+        RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
+        FeatureService featureService = new FeatureService(List.of(new FeatureSpecification() {
+            @Override
+            public Set<NodeFeature> getFeatures() {
+                return Set.of(new NodeFeature("f1"), new NodeFeature("af1", true));
+            }
+        }));
+
+        NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService, featureService);
+
+        DiscoveryNode masterNode = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        DiscoveryNode otherNode = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        Map<String, Set<String>> features = new HashMap<>();
+        features.put(masterNode.getId(), Set.of("f1", "af1"));
+        features.put(otherNode.getId(), Set.of("f1", "af1"));
+
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).add(otherNode))
+            .nodeFeatures(features)
+            .build();
+
+        DiscoveryNode newNodeNextMajor = DiscoveryNodeUtils.builder(UUIDs.base64UUID())
+            .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current())
+            .build();
+        clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
+            clusterState,
+            executor,
+            List.of(
+                JoinTask.singleNode(
+                    newNodeNextMajor,
+                    CompatibilityVersionsUtils.staticCurrent(),
+                    Set.of("f1"),
+                    TEST_REASON,
+                    NO_FAILURE_LISTENER,
+                    0L
+                )
+            )
+        );
+        features.put(newNodeNextMajor.getId(), Set.of("f1"));
+
+        // even though a next major has joined without af1, this doesnt allow the current major to join with af1 missing features
+        DiscoveryNode newNodeCurMajor = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        AtomicReference<Exception> ex = new AtomicReference<>();
+        clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
+            clusterState,
+            executor,
+            List.of(
+                JoinTask.singleNode(
+                    newNodeCurMajor,
+                    CompatibilityVersionsUtils.staticCurrent(),
+                    Set.of("f1"),
+                    TEST_REASON,
+                    ActionTestUtils.assertNoSuccessListener(ex::set),
+                    0L
+                )
+            )
+        );
+        assertThat(ex.get().getMessage(), containsString("missing required features [af1]"));
+
+        // a next major can't join missing non-assumed features
+        DiscoveryNode newNodeNextMajorMissing = DiscoveryNodeUtils.builder(UUIDs.base64UUID())
+            .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current())
+            .build();
+        ex.set(null);
+        clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
+            clusterState,
+            executor,
+            List.of(
+                JoinTask.singleNode(
+                    newNodeNextMajorMissing,
+                    CompatibilityVersionsUtils.staticCurrent(),
+                    Set.of(),
+                    TEST_REASON,
+                    ActionTestUtils.assertNoSuccessListener(ex::set),
+                    0L
+                )
+            )
+        );
+        assertThat(ex.get().getMessage(), containsString("missing required features [f1]"));
+
+        // extra final check that the recorded cluster features are as they should be, and newNodeNextMajor hasn't gained af1
+        assertThat(getRecordedNodeFeatures(clusterState), equalTo(features));
+    }
+
+    /*
+     * Same as above but the current major missing features is processed in the same execution
+     */
+    public void testJoinClusterWithAssumedFeaturesDoesntAllowNonAssumedSameExecute() throws Exception {
+        AllocationService allocationService = createAllocationService();
+        RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
+        FeatureService featureService = new FeatureService(List.of(new FeatureSpecification() {
+            @Override
+            public Set<NodeFeature> getFeatures() {
+                return Set.of(new NodeFeature("f1"), new NodeFeature("af1", true));
+            }
+        }));
+
+        NodeJoinExecutor executor = new NodeJoinExecutor(allocationService, rerouteService, featureService);
+
+        DiscoveryNode masterNode = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        DiscoveryNode otherNode = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        Map<String, Set<String>> features = new HashMap<>();
+        features.put(masterNode.getId(), Set.of("f1", "af1"));
+        features.put(otherNode.getId(), Set.of("f1", "af1"));
+
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).add(otherNode))
+            .nodeFeatures(features)
+            .build();
+
+        DiscoveryNode newNodeNextMajor = DiscoveryNodeUtils.builder(UUIDs.base64UUID())
+            .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current())
+            .build();
+        DiscoveryNode newNodeCurMajor = DiscoveryNodeUtils.create(UUIDs.base64UUID());
+        DiscoveryNode newNodeNextMajorMissing = DiscoveryNodeUtils.builder(UUIDs.base64UUID())
+            .version(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current())
+            .build();
+        // even though a next major could join, this doesnt allow the current major to join with missing features
+        // nor a next major missing non-assumed features
+        AtomicReference<Exception> thisMajorEx = new AtomicReference<>();
+        AtomicReference<Exception> nextMajorEx = new AtomicReference<>();
+        List<JoinTask> tasks = List.of(
+            JoinTask.singleNode(
+                newNodeNextMajor,
+                CompatibilityVersionsUtils.staticCurrent(),
+                Set.of("f1"),
+                TEST_REASON,
+                NO_FAILURE_LISTENER,
+                0L
+            ),
+            JoinTask.singleNode(
+                newNodeCurMajor,
+                CompatibilityVersionsUtils.staticCurrent(),
+                Set.of("f1"),
+                TEST_REASON,
+                ActionTestUtils.assertNoSuccessListener(thisMajorEx::set),
+                0L
+            ),
+            JoinTask.singleNode(
+                newNodeNextMajorMissing,
+                CompatibilityVersionsUtils.staticCurrent(),
+                Set.of(),
+                TEST_REASON,
+                ActionTestUtils.assertNoSuccessListener(nextMajorEx::set),
+                0L
+            )
+        );
+        if (randomBoolean()) {
+            // sometimes combine them together into a single task for completeness
+            tasks = List.of(new JoinTask(tasks.stream().flatMap(t -> t.nodeJoinTasks().stream()).toList(), false, 0L, null));
+        }
+
+        clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(clusterState, executor, tasks);
+        features.put(newNodeNextMajor.getId(), Set.of("f1"));
+
+        assertThat(thisMajorEx.get().getMessage(), containsString("missing required features [af1]"));
+        assertThat(nextMajorEx.get().getMessage(), containsString("missing required features [f1]"));
+
+        // extra check that the recorded cluster features are as they should be, and newNodeNextMajor hasn't gained af1
+        assertThat(getRecordedNodeFeatures(clusterState), equalTo(features));
+    }
+
     public void testSuccess() {
         Settings.builder().build();
         Metadata.Builder metaBuilder = Metadata.builder();
@@ -921,8 +1145,8 @@ public class NodeJoinExecutorTests extends ESTestCase {
             .nodeFeatures(Map.of(masterNode.getId(), Set.of("f1", "f2"), rejoinNode.getId(), Set.of()))
             .build();
 
-        assertThat(clusterState.clusterFeatures().clusterHasFeature(new NodeFeature("f1")), is(false));
-        assertThat(clusterState.clusterFeatures().clusterHasFeature(new NodeFeature("f2")), is(false));
+        assertThat(clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), new NodeFeature("f1")), is(false));
+        assertThat(clusterState.clusterFeatures().clusterHasFeature(clusterState.nodes(), new NodeFeature("f2")), is(false));
 
         final var resultingState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
             clusterState,
@@ -939,8 +1163,8 @@ public class NodeJoinExecutorTests extends ESTestCase {
             )
         );
 
-        assertThat(resultingState.clusterFeatures().clusterHasFeature(new NodeFeature("f1")), is(true));
-        assertThat(resultingState.clusterFeatures().clusterHasFeature(new NodeFeature("f2")), is(true));
+        assertThat(resultingState.clusterFeatures().clusterHasFeature(resultingState.nodes(), new NodeFeature("f1")), is(true));
+        assertThat(resultingState.clusterFeatures().clusterHasFeature(resultingState.nodes(), new NodeFeature("f2")), is(true));
     }
 
     private DesiredNodeWithStatus createActualizedDesiredNode() {

+ 36 - 0
server/src/test/java/org/elasticsearch/features/FeatureServiceTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.node.VersionInformation;
 import org.elasticsearch.index.IndexVersion;
 import org.elasticsearch.index.IndexVersions;
 import org.elasticsearch.test.ESTestCase;
@@ -118,6 +119,12 @@ public class FeatureServiceTests extends ESTestCase {
         );
 
         ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(
+                DiscoveryNodes.builder()
+                    .add(DiscoveryNodeUtils.create("node1"))
+                    .add(DiscoveryNodeUtils.create("node2"))
+                    .add(DiscoveryNodeUtils.create("node3"))
+            )
             .nodeFeatures(
                 Map.of("node1", Set.of("f1", "f2", "nf1"), "node2", Set.of("f1", "f2", "nf2"), "node3", Set.of("f1", "f2", "nf1"))
             )
@@ -176,4 +183,33 @@ public class FeatureServiceTests extends ESTestCase {
         assertFalse(service.clusterHasFeature(stateWithMinVersion(Version.V_7_16_0), v8_10_0));
         assertFalse(service.clusterHasFeature(stateWithMinVersion(Version.V_7_16_0), v7_17_0));
     }
+
+    private static Version nextMajor() {
+        return Version.fromId((Version.CURRENT.major + 1) * 1_000_000 + 99);
+    }
+
+    public void testStateHasAssumedFeatures() {
+        List<FeatureSpecification> specs = List.of(
+            new TestFeatureSpecification(Set.of(new NodeFeature("f1"), new NodeFeature("f2"), new NodeFeature("af1", true)), Map.of())
+        );
+
+        ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
+            .nodes(
+                DiscoveryNodes.builder()
+                    .add(DiscoveryNodeUtils.create("node1"))
+                    .add(DiscoveryNodeUtils.create("node2"))
+                    .add(
+                        DiscoveryNodeUtils.builder("node3")
+                            .version(new VersionInformation(nextMajor(), IndexVersions.MINIMUM_COMPATIBLE, IndexVersion.current()))
+                            .build()
+                    )
+            )
+            .nodeFeatures(Map.of("node1", Set.of("f1", "af1"), "node2", Set.of("f1", "f2", "af1"), "node3", Set.of("f1", "f2")))
+            .build();
+
+        FeatureService service = new FeatureService(specs);
+        assertTrue(service.clusterHasFeature(state, new NodeFeature("f1")));
+        assertFalse(service.clusterHasFeature(state, new NodeFeature("f2")));
+        assertTrue(service.clusterHasFeature(state, new NodeFeature("af1", true)));
+    }
 }

+ 1 - 1
server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java

@@ -77,8 +77,8 @@ public class HealthNodeTaskExecutorTests extends ESTestCase {
         clusterService = createClusterService(threadPool);
         localNodeId = clusterService.localNode().getId();
         persistentTasksService = mock(PersistentTasksService.class);
-        featureService = new FeatureService(List.of(new HealthFeatures()));
         settings = Settings.builder().build();
+        featureService = new FeatureService(List.of(new HealthFeatures()));
         clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
     }
 

+ 3 - 0
x-pack/plugin/slm/src/test/java/org/elasticsearch/xpack/slm/SnapshotLifecycleServiceTests.java

@@ -529,6 +529,7 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
         var featureService = new FeatureService(List.of(new SnapshotLifecycleFeatures()));
         {
             ClusterState state = ClusterState.builder(new ClusterName("cluster"))
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("a")).add(DiscoveryNodeUtils.create("b")))
                 .nodeFeatures(Map.of("a", Set.of(), "b", Set.of(SnapshotLifecycleService.INTERVAL_SCHEDULE.id())))
                 .build();
 
@@ -540,6 +541,7 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
         }
         {
             ClusterState state = ClusterState.builder(new ClusterName("cluster"))
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("a")))
                 .nodeFeatures(Map.of("a", Set.of(SnapshotLifecycleService.INTERVAL_SCHEDULE.id())))
                 .build();
             try {
@@ -550,6 +552,7 @@ public class SnapshotLifecycleServiceTests extends ESTestCase {
         }
         {
             ClusterState state = ClusterState.builder(new ClusterName("cluster"))
+                .nodes(DiscoveryNodes.builder().add(DiscoveryNodeUtils.create("a")).add(DiscoveryNodeUtils.create("b")))
                 .nodeFeatures(Map.of("a", Set.of(), "b", Set.of(SnapshotLifecycleService.INTERVAL_SCHEDULE.id())))
                 .build();
             try {