Przeglądaj źródła

Operator/role mapping (#89667)

Implement file based settings handler for role mappings.

Co-authored-by: Yang Wang <ywangd@gmail.com>
Nikola Grcevski 3 lat temu
rodzic
commit
2b3dbdebf3
37 zmienionych plików z 1882 dodań i 137 usunięć
  1. 5 0
      docs/changelog/89667.yaml
  2. 2 2
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java
  3. 2 2
      server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java
  4. 2 2
      server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java
  5. 53 0
      server/src/main/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportAction.java
  6. 9 53
      server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java
  7. 7 0
      server/src/main/java/org/elasticsearch/cluster/metadata/ReservedStateMetadata.java
  8. 73 0
      server/src/main/java/org/elasticsearch/reservedstate/ActionWithReservedState.java
  9. 22 0
      server/src/main/java/org/elasticsearch/reservedstate/NonStateTransformResult.java
  10. 11 1
      server/src/main/java/org/elasticsearch/reservedstate/TransformState.java
  11. 192 32
      server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java
  12. 25 11
      server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java
  13. 1 0
      server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java
  14. 154 0
      server/src/test/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportActionTests.java
  15. 9 9
      server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java
  16. 199 2
      server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java
  17. 2 2
      x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportDeleteAutoscalingPolicyAction.java
  18. 2 2
      x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportPutAutoscalingPolicyAction.java
  19. 17 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/rolemapping/PutRoleMappingRequest.java
  20. 2 2
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java
  21. 2 2
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java
  22. 2 2
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportDeleteSnapshotLifecycleAction.java
  23. 2 2
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportPutSnapshotLifecycleAction.java
  24. 440 0
      x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/RoleMappingFileSettingsIT.java
  25. 4 0
      x-pack/plugin/security/src/main/java/module-info.java
  26. 33 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ReservedSecurityStateHandlerProvider.java
  27. 14 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
  28. 147 0
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/ReservedRoleMappingAction.java
  29. 21 4
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingAction.java
  30. 23 4
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingAction.java
  31. 8 0
      x-pack/plugin/security/src/main/resources/META-INF/services/org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider
  32. 55 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalReservedSecurityStateHandlerProvider.java
  33. 5 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java
  34. 245 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/reservedstate/ReservedRoleMappingActionTests.java
  35. 45 0
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingActionTests.java
  36. 39 2
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java
  37. 8 0
      x-pack/plugin/security/src/test/resources/META-INF/services/org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider

+ 5 - 0
docs/changelog/89667.yaml

@@ -0,0 +1,5 @@
+pr: 89667
+summary: Operator/role mapping
+area: Infra/Core
+type: enhancement
+issues: []

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java

@@ -75,12 +75,12 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedRepositoryAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(DeleteRepositoryRequest request) {
+    public Set<String> modifiedKeys(DeleteRepositoryRequest request) {
         return Set.of(request.name());
     }
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java

@@ -72,12 +72,12 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedRepositoryAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(PutRepositoryRequest request) {
+    public Set<String> modifiedKeys(PutRepositoryRequest request) {
         return Set.of(request.name());
     }
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

@@ -131,12 +131,12 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedClusterSettingsAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
+    public Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
         Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
         return allSettings.keySet();
     }

+ 53 - 0
server/src/main/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportAction.java

@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.reservedstate.ActionWithReservedState;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.TransportService;
+
+/**
+ * An extension of the {@link HandledTransportAction} class, which wraps the doExecute call with a check for clashes
+ * with the reserved cluster state.
+ */
+public abstract class ReservedStateAwareHandledTransportAction<Request extends ActionRequest, Response extends ActionResponse> extends
+    HandledTransportAction<Request, Response>
+    implements
+        ActionWithReservedState<Request> {
+    private final ClusterService clusterService;
+
+    protected ReservedStateAwareHandledTransportAction(
+        String actionName,
+        ClusterService clusterService,
+        TransportService transportService,
+        ActionFilters actionFilters,
+        Writeable.Reader<Request> requestReader
+    ) {
+        super(actionName, transportService, actionFilters, requestReader);
+        this.clusterService = clusterService;
+    }
+
+    /**
+     * A doExecute method wrapped with a check for clashes with updates to the reserved cluster state
+     */
+    protected abstract void doExecuteProtected(Task task, Request request, ActionListener<Response> listener);
+
+    @Override
+    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
+        assert reservedStateHandlerName().isPresent();
+
+        validateForReservedState(clusterService.state(), reservedStateHandlerName().get(), modifiedKeys(request), request.toString());
+        doExecuteProtected(task, request, listener);
+    }
+}

+ 9 - 53
server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java

@@ -23,7 +23,6 @@ import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -34,7 +33,7 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.gateway.GatewayService;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.node.NodeClosedException;
-import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
+import org.elasticsearch.reservedstate.ActionWithReservedState;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskCancelledException;
@@ -44,11 +43,7 @@ import org.elasticsearch.transport.RemoteTransportException;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
-import java.util.Set;
 import java.util.function.Predicate;
 
 import static org.elasticsearch.core.Strings.format;
@@ -57,7 +52,9 @@ import static org.elasticsearch.core.Strings.format;
  * A base class for operations that needs to be performed on the master node.
  */
 public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extends
-    HandledTransportAction<Request, Response> {
+    HandledTransportAction<Request, Response>
+    implements
+        ActionWithReservedState<Request> {
 
     private static final Logger logger = LogManager.getLogger(TransportMasterNodeAction.class);
 
@@ -149,65 +146,24 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
         }
     }
 
-    /**
-     * Override this method if the master node action also has an {@link ReservedClusterStateHandler}
-     * interaction.
-     * <p>
-     * We need to check if certain settings or entities are allowed to be modified by the master node
-     * action, depending on if they are set as reserved in 'operator' mode (file based settings, modules, plugins).
-     *
-     * @return an Optional of the {@link ReservedClusterStateHandler} name
-     */
-    protected Optional<String> reservedStateHandlerName() {
-        return Optional.empty();
-    }
-
-    /**
-     * Override this method to return the keys of the cluster state or cluster entities that are modified by
-     * the Request object.
-     * <p>
-     * This method is used by the reserved state handler logic (see {@link ReservedClusterStateHandler})
-     * to verify if the keys don't conflict with an existing key set as reserved.
-     *
-     * @param request the TransportMasterNode request
-     * @return set of String keys intended to be modified/set/deleted by this request
-     */
-    protected Set<String> modifiedKeys(Request request) {
-        return Collections.emptySet();
-    }
-
     // package private for testing
-    void validateForImmutableState(Request request, ClusterState state) {
+    void validateForReservedState(Request request, ClusterState state) {
         Optional<String> handlerName = reservedStateHandlerName();
         assert handlerName.isPresent();
 
-        Set<String> modified = modifiedKeys(request);
-        List<String> errors = new ArrayList<>();
-
-        for (ReservedStateMetadata metadata : state.metadata().reservedStateMetadata().values()) {
-            Set<String> conflicts = metadata.conflicts(handlerName.get(), modified);
-            if (conflicts.isEmpty() == false) {
-                errors.add(format("[%s] set as read-only by [%s]", String.join(", ", conflicts), metadata.namespace()));
-            }
-        }
-
-        if (errors.isEmpty() == false) {
-            throw new IllegalArgumentException(
-                format("Failed to process request [%s] with errors: [%s]", request, String.join(", ", errors))
-            );
-        }
+        validateForReservedState(state, handlerName.get(), modifiedKeys(request), request.toString());
     }
 
     // package private for testing
-    boolean supportsImmutableState() {
+    boolean supportsReservedState() {
         return reservedStateHandlerName().isPresent();
     }
 
     @Override
     protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
         ClusterState state = clusterService.state();
-        if (supportsImmutableState()) {
-            validateForImmutableState(request, state);
+        if (supportsReservedState()) {
+            validateForReservedState(request, state);
         }
         logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
         if (task != null) {

+ 7 - 0
server/src/main/java/org/elasticsearch/cluster/metadata/ReservedStateMetadata.java

@@ -283,6 +283,13 @@ public record ReservedStateMetadata(
             return this;
         }
 
+        /**
+         * Returns the current handler metadata stored in the builder
+         */
+        public ReservedStateHandlerMetadata getHandler(String handlerName) {
+            return this.handlers.get(handlerName);
+        }
+
         /**
          * Builds an {@link ReservedStateMetadata} from this builder.
          *

+ 73 - 0
server/src/main/java/org/elasticsearch/reservedstate/ActionWithReservedState.java

@@ -0,0 +1,73 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.reservedstate;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.elasticsearch.core.Strings.format;
+
+public interface ActionWithReservedState<T> {
+    /**
+     * Override this method if the master node action also has an {@link ReservedClusterStateHandler}
+     * interaction.
+     * <p>
+     * We need to check if certain settings or entities are allowed to be modified by the master node
+     * action, depending on if they are set as reserved in 'operator' mode (file based settings, modules, plugins).
+     *
+     * @return an Optional of the {@link ReservedClusterStateHandler} name
+     */
+    default Optional<String> reservedStateHandlerName() {
+        return Optional.empty();
+    }
+
+    /**
+     * Override this method to return the keys of the cluster state or cluster entities that are modified by
+     * the Request object.
+     * <p>
+     * This method is used by the reserved state handler logic (see {@link ReservedClusterStateHandler})
+     * to verify if the keys don't conflict with an existing key set as reserved.
+     *
+     * @param request the TransportMasterNode request
+     * @return set of String keys intended to be modified/set/deleted by this request
+     */
+    default Set<String> modifiedKeys(T request) {
+        return Collections.emptySet();
+    }
+
+    /**
+     * Helper method that verifies for key clashes on reserved state updates
+     * @param state the current cluster state
+     * @param handlerName the name of the reserved state handler related to this implementation
+     * @param modified the set of modified keys by the related request
+     * @param request a string representation of the request for error reporting purposes
+     */
+    default void validateForReservedState(ClusterState state, String handlerName, Set<String> modified, String request) {
+        List<String> errors = new ArrayList<>();
+
+        for (ReservedStateMetadata metadata : state.metadata().reservedStateMetadata().values()) {
+            Set<String> conflicts = metadata.conflicts(handlerName, modified);
+            if (conflicts.isEmpty() == false) {
+                errors.add(format("[%s] set as read-only by [%s]", String.join(", ", conflicts), metadata.namespace()));
+            }
+        }
+
+        if (errors.isEmpty() == false) {
+            throw new IllegalArgumentException(
+                format("Failed to process request [%s] with errors: [%s]", request, String.join(", ", errors))
+            );
+        }
+    }
+}

+ 22 - 0
server/src/main/java/org/elasticsearch/reservedstate/NonStateTransformResult.java

@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.reservedstate;
+
+import java.util.Set;
+
+/**
+ * A wrapper class for notifying listeners on non cluster state transformation operation completion.
+ * <p>
+ * Certain {@link ReservedClusterStateHandler} implementations may need to perform additional
+ * operations other than modifying the cluster state. This can range from cache
+ * invalidation to implementing state handlers that do not write to the cluster state, e.g. role mappings.
+ * These additional transformation steps are implemented as separate async operation after the validation of
+ * the cluster state update steps (trial run in {@link org.elasticsearch.reservedstate.service.ReservedClusterStateService}).
+ */
+public record NonStateTransformResult(String handlerName, Set<String> updatedKeys) {}

+ 11 - 1
server/src/main/java/org/elasticsearch/reservedstate/TransformState.java

@@ -8,14 +8,24 @@
 
 package org.elasticsearch.reservedstate;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
 
 import java.util.Set;
+import java.util.function.Consumer;
 
 /**
  * A {@link ClusterState} wrapper used by the ReservedClusterStateService to pass the
  * current state as well as previous keys set by an {@link ReservedClusterStateHandler} to each transform
  * step of the cluster state update.
  *
+ * Each {@link ReservedClusterStateHandler} can also provide a non cluster state transform consumer that should run after
+ * the cluster state is fully validated. This allows for handlers to perform extra steps, like clearing caches or saving
+ * other state outside the cluster state. The consumer, if provided, must return a {@link NonStateTransformResult} with
+ * the keys that will be saved as reserved in the cluster state.
  */
-public record TransformState(ClusterState state, Set<String> keys) {}
+public record TransformState(ClusterState state, Set<String> keys, Consumer<ActionListener<NonStateTransformResult>> nonStateTransform) {
+    public TransformState(ClusterState state, Set<String> keys) {
+        this(state, keys, null);
+    }
+}

+ 192 - 32
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedClusterStateService.java

@@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
@@ -19,11 +20,15 @@ import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.reservedstate.NonStateTransformResult;
 import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
+import org.elasticsearch.reservedstate.TransformState;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.XContentParser;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -33,7 +38,10 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.elasticsearch.ExceptionsHelper.stackTrace;
 import static org.elasticsearch.core.Strings.format;
+import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
+import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.keysForHandler;
 
 /**
  * Controller class for storing and reserving a portion of the {@link ClusterState}
@@ -149,46 +157,110 @@ public class ReservedClusterStateService {
         ClusterState state = clusterService.state();
         ReservedStateMetadata existingMetadata = state.metadata().reservedStateMetadata().get(namespace);
 
-        clusterService.submitStateUpdateTask(
-            "reserved cluster state [" + namespace + "]",
-            new ReservedStateUpdateTask(
-                namespace,
-                reservedStateChunk,
-                handlers,
-                orderedHandlers,
-                (clusterState, errorState) -> saveErrorState(clusterState, errorState),
-                new ActionListener<>() {
-                    @Override
-                    public void onResponse(ActionResponse.Empty empty) {
-                        logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace);
-                        errorListener.accept(null);
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        // Don't spam the logs on repeated errors
-                        if (isNewError(existingMetadata, reservedStateVersion.version())) {
-                            logger.debug("Failed to apply reserved cluster state", e);
-                            errorListener.accept(e);
-                        } else {
-                            errorListener.accept(null);
+        // We check if we should exit early on the state version from clusterService. The ReservedStateUpdateTask
+        // will check again with the most current state version if this continues.
+        if (checkMetadataVersion(namespace, existingMetadata, reservedStateVersion) == false) {
+            errorListener.accept(null);
+            return;
+        }
+
+        // We trial run all handler validations to ensure that we can process all of the cluster state error free. During
+        // the trial run we collect 'consumers' (functions) for any non cluster state transforms that need to run.
+        var trialRunResult = trialRun(namespace, state, reservedStateChunk, orderedHandlers);
+        var error = checkAndReportError(namespace, trialRunResult.errors, state, reservedStateVersion);
+
+        if (error != null) {
+            errorListener.accept(error);
+            return;
+        }
+
+        // Since we have validated that the cluster state update can be correctly performed in the trial run, we now
+        // execute the non cluster state transforms. These are assumed to be async and we continue with the cluster state update
+        // after all have completed. This part of reserved cluster state update is non-atomic, some or all of the non-state
+        // transformations can succeed, and we can fail to eventually write the reserved cluster state.
+        executeNonStateTransformationSteps(trialRunResult.nonStateTransforms, new ActionListener<>() {
+            @Override
+            public void onResponse(Collection<NonStateTransformResult> nonStateTransformResults) {
+                // Once all of the non-state transformation results complete, we can proceed to
+                // do the final save of the cluster state. The non-state transformation reserved keys are applied
+                // to the reserved state after all other key handlers.
+                clusterService.submitStateUpdateTask(
+                    "reserved cluster state [" + namespace + "]",
+                    new ReservedStateUpdateTask(
+                        namespace,
+                        reservedStateChunk,
+                        nonStateTransformResults,
+                        handlers,
+                        orderedHandlers,
+                        (clusterState, errorState) -> saveErrorState(clusterState, errorState),
+                        new ActionListener<>() {
+                            @Override
+                            public void onResponse(ActionResponse.Empty empty) {
+                                logger.info("Successfully applied new reserved cluster state for namespace [{}]", namespace);
+                                errorListener.accept(null);
+                            }
+
+                            @Override
+                            public void onFailure(Exception e) {
+                                // Don't spam the logs on repeated errors
+                                if (isNewError(existingMetadata, reservedStateVersion.version())) {
+                                    logger.debug("Failed to apply reserved cluster state", e);
+                                    errorListener.accept(e);
+                                } else {
+                                    errorListener.accept(null);
+                                }
+                            }
                         }
-                    }
-                }
-            ),
-            ClusterStateTaskConfig.build(Priority.URGENT),
-            updateStateTaskExecutor
-        );
+                    ),
+                    ClusterStateTaskConfig.build(Priority.URGENT),
+                    updateStateTaskExecutor
+                );
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                // If we encounter an error while runnin the non-state transforms, we avoid saving any cluster state.
+                errorListener.accept(checkAndReportError(namespace, List.of(e.getMessage()), state, reservedStateVersion));
+            }
+        });
+    }
+
+    // package private for testing
+    Exception checkAndReportError(
+        String namespace,
+        List<String> errors,
+        ClusterState currentState,
+        ReservedStateVersion reservedStateVersion
+    ) {
+        // Any errors should be discovered through validation performed in the transform calls
+        if (errors.isEmpty() == false) {
+            logger.debug("Error processing state change request for [{}] with the following errors [{}]", namespace, errors);
+
+            var errorState = new ErrorState(
+                namespace,
+                reservedStateVersion.version(),
+                errors,
+                ReservedStateErrorMetadata.ErrorKind.VALIDATION
+            );
+
+            saveErrorState(currentState, errorState);
+
+            return new IllegalStateException("Error processing state change request for " + namespace + ", errors: " + errorState);
+        }
+
+        return null;
     }
 
     // package private for testing
     static boolean isNewError(ReservedStateMetadata existingMetadata, Long newStateVersion) {
         return (existingMetadata == null
             || existingMetadata.errorMetadata() == null
+            || newStateVersion <= 0 // version will be -1 when we can't even parse the file, it might be 0 on snapshot restore
             || existingMetadata.errorMetadata().version() < newStateVersion);
     }
 
-    private void saveErrorState(ClusterState clusterState, ErrorState errorState) {
+    // package private for testing
+    void saveErrorState(ClusterState clusterState, ErrorState errorState) {
         ReservedStateMetadata existingMetadata = clusterState.metadata().reservedStateMetadata().get(errorState.namespace());
 
         if (isNewError(existingMetadata, errorState.version()) == false) {
@@ -203,6 +275,10 @@ public class ReservedClusterStateService {
             return;
         }
 
+        submitErrorUpdateTask(errorState);
+    }
+
+    private void submitErrorUpdateTask(ErrorState errorState) {
         clusterService.submitStateUpdateTask(
             "reserved cluster state update error for [ " + errorState.namespace() + "]",
             new ReservedStateErrorTask(errorState, new ActionListener<>() {
@@ -221,11 +297,90 @@ public class ReservedClusterStateService {
         );
     }
 
+    /**
+     * Goes through all of the handlers, runs the validation and the transform part of the cluster state.
+     * <p>
+     * While running the handlers we also collect any non cluster state transformation consumer actions that
+     * need to be performed asynchronously before we attempt to save the cluster state. The trial run does not
+     * result in an update of the cluster state, it's only purpose is to verify if we can correctly perform a
+     * cluster state update with the given reserved state chunk.
+     *
+     * Package private for testing
+     */
+    TrialRunResult trialRun(
+        String namespace,
+        ClusterState currentState,
+        ReservedStateChunk stateChunk,
+        LinkedHashSet<String> orderedHandlers
+    ) {
+        ReservedStateMetadata existingMetadata = currentState.metadata().reservedStateMetadata().get(namespace);
+        Map<String, Object> reservedState = stateChunk.state();
+
+        List<String> errors = new ArrayList<>();
+        List<Consumer<ActionListener<NonStateTransformResult>>> nonStateTransforms = new ArrayList<>();
+
+        ClusterState state = currentState;
+
+        for (var handlerName : orderedHandlers) {
+            ReservedClusterStateHandler<?> handler = handlers.get(handlerName);
+            try {
+                Set<String> existingKeys = keysForHandler(existingMetadata, handlerName);
+                TransformState transformState = handler.transform(reservedState.get(handlerName), new TransformState(state, existingKeys));
+                state = transformState.state();
+                if (transformState.nonStateTransform() != null) {
+                    nonStateTransforms.add(transformState.nonStateTransform());
+                }
+            } catch (Exception e) {
+                errors.add(format("Error processing %s state change: %s", handler.name(), stackTrace(e)));
+            }
+        }
+
+        return new TrialRunResult(nonStateTransforms, errors);
+    }
+
+    /**
+     * Runs the non cluster state transformations asynchronously, collecting the {@link NonStateTransformResult} objects.
+     * <p>
+     * Once all non cluster state transformations have completed, we submit the cluster state update task, which
+     * updates all of the handler state, including the keys produced by the non cluster state transforms. The new reserved
+     * state version isn't written to the cluster state until the cluster state task runs.
+     *
+     * Package private for testing
+     */
+    void executeNonStateTransformationSteps(
+        List<Consumer<ActionListener<NonStateTransformResult>>> nonStateTransforms,
+        ActionListener<Collection<NonStateTransformResult>> listener
+    ) {
+        // Don't create grouped listener with 0 actions, just return
+        if (nonStateTransforms.isEmpty()) {
+            listener.onResponse(List.of());
+            return;
+        }
+
+        GroupedActionListener<NonStateTransformResult> postTasksListener = new GroupedActionListener<>(new ActionListener<>() {
+            @Override
+            public void onResponse(Collection<NonStateTransformResult> updateKeyTaskResult) {
+                listener.onResponse(updateKeyTaskResult);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+        }, nonStateTransforms.size());
+
+        for (var transform : nonStateTransforms) {
+            // non cluster state transforms don't modify the cluster state, they however are given a chance to return a more
+            // up-to-date version of the modified keys we should save in the reserved state. These calls are
+            // async and report back when they are done through the postTasksListener.
+            transform.accept(postTasksListener);
+        }
+    }
+
     /**
      * Returns an ordered set ({@link LinkedHashSet}) of the cluster state handlers that need to
      * execute for a given list of handler names supplied through the {@link ReservedStateChunk}.
      * @param handlerNames Names of handlers found in the {@link ReservedStateChunk}
-     * @return
      */
     LinkedHashSet<String> orderedStateHandlers(Set<String> handlerNames) {
         LinkedHashSet<String> orderedHandlers = new LinkedHashSet<>();
@@ -280,9 +435,14 @@ public class ReservedClusterStateService {
 
     /**
      * Adds additional {@link ReservedClusterStateHandler} to the handler registry
-     * @param handler
+     * @param handler an additional reserved state handler to be added
      */
     public void installStateHandler(ReservedClusterStateHandler<?> handler) {
         this.handlers.put(handler.name(), handler);
     }
+
+    /**
+     * Helper record class to combine the result of a trial run, non cluster state actions and any errors
+     */
+    record TrialRunResult(List<Consumer<ActionListener<NonStateTransformResult>>> nonStateTransforms, List<String> errors) {}
 }

+ 25 - 11
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTask.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
 import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
 import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
+import org.elasticsearch.reservedstate.NonStateTransformResult;
 import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
 import org.elasticsearch.reservedstate.TransformState;
 
@@ -49,10 +50,12 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
     private final Collection<String> orderedHandlers;
     private final BiConsumer<ClusterState, ErrorState> errorReporter;
     private final ActionListener<ActionResponse.Empty> listener;
+    private final Collection<NonStateTransformResult> nonStateTransformResults;
 
     public ReservedStateUpdateTask(
         String namespace,
         ReservedStateChunk stateChunk,
+        Collection<NonStateTransformResult> nonStateTransformResults,
         Map<String, ReservedClusterStateHandler<?>> handlers,
         Collection<String> orderedHandlers,
         BiConsumer<ClusterState, ErrorState> errorReporter,
@@ -60,6 +63,7 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
     ) {
         this.namespace = namespace;
         this.stateChunk = stateChunk;
+        this.nonStateTransformResults = nonStateTransformResults;
         this.handlers = handlers;
         this.orderedHandlers = orderedHandlers;
         this.errorReporter = errorReporter;
@@ -88,6 +92,7 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
         List<String> errors = new ArrayList<>();
 
         ClusterState state = currentState;
+        // Transform the cluster state first
         for (var handlerName : orderedHandlers) {
             ReservedClusterStateHandler<?> handler = handlers.get(handlerName);
             try {
@@ -100,9 +105,26 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
             }
         }
 
+        checkAndThrowOnError(errors, currentState, reservedStateVersion);
+
+        // Once we have set all of the handler state from the cluster state update tasks, we add the reserved keys
+        // from the non cluster state transforms.
+        for (var transform : nonStateTransformResults) {
+            reservedMetadataBuilder.putHandler(new ReservedStateHandlerMetadata(transform.handlerName(), transform.updatedKeys()));
+        }
+
+        // Remove the last error if we had previously encountered any in prior processing of reserved state
+        reservedMetadataBuilder.errorMetadata(null);
+
+        ClusterState.Builder stateBuilder = new ClusterState.Builder(state);
+        Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(reservedMetadataBuilder.build());
+
+        return stateBuilder.metadata(metadataBuilder).build();
+    }
+
+    private void checkAndThrowOnError(List<String> errors, ClusterState currentState, ReservedStateVersion reservedStateVersion) {
+        // Any errors should be discovered through validation performed in the transform calls
         if (errors.isEmpty() == false) {
-            // Check if we had previous error metadata with version information, don't spam with cluster state updates, if the
-            // version hasn't been updated.
             logger.debug("Error processing state change request for [{}] with the following errors [{}]", namespace, errors);
 
             var errorState = new ErrorState(
@@ -116,17 +138,9 @@ public class ReservedStateUpdateTask implements ClusterStateTaskListener {
 
             throw new IllegalStateException("Error processing state change request for " + namespace + ", errors: " + errorState);
         }
-
-        // remove the last error if we had previously encountered any
-        reservedMetadataBuilder.errorMetadata(null);
-
-        ClusterState.Builder stateBuilder = new ClusterState.Builder(state);
-        Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()).put(reservedMetadataBuilder.build());
-
-        return stateBuilder.metadata(metadataBuilder).build();
     }
 
-    private Set<String> keysForHandler(ReservedStateMetadata reservedStateMetadata, String handlerName) {
+    static Set<String> keysForHandler(ReservedStateMetadata reservedStateMetadata, String handlerName) {
         if (reservedStateMetadata == null || reservedStateMetadata.handlers().get(handlerName) == null) {
             return Collections.emptySet();
         }

+ 1 - 0
server/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateUpdateTaskExecutor.java

@@ -29,6 +29,7 @@ public record ReservedStateUpdateTaskExecutor(RerouteService rerouteService) imp
     @Override
     public ClusterState execute(BatchExecutionContext<ReservedStateUpdateTask> batchExecutionContext) throws Exception {
         var updatedState = batchExecutionContext.initialState();
+
         for (final var taskContext : batchExecutionContext.taskContexts()) {
             try (var ignored = taskContext.captureResponseHeaders()) {
                 updatedState = taskContext.getTask().execute(updatedState);

+ 154 - 0
server/src/test/java/org/elasticsearch/action/support/ReservedStateAwareHandledTransportActionTests.java

@@ -0,0 +1,154 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
+import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class ReservedStateAwareHandledTransportActionTests extends ESTestCase {
+    public void testRejectImmutableConflictClusterStateUpdate() {
+        ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("a", "b"));
+        ReservedStateHandlerMetadata hmThree = new ReservedStateHandlerMetadata(ReservedClusterSettingsAction.NAME, Set.of("e", "f"));
+        ReservedStateMetadata omOne = ReservedStateMetadata.builder("namespace_one").putHandler(hmOne).build();
+        ReservedStateMetadata omTwo = ReservedStateMetadata.builder("namespace_two").putHandler(hmThree).build();
+
+        Metadata metadata = Metadata.builder().put(omOne).put(omTwo).build();
+
+        ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(metadata).build();
+        ClusterService clusterService = mock(ClusterService.class);
+        doReturn(clusterState).when(clusterService).state();
+
+        Action handler = new Action("internal:testAction", clusterService, mock(TransportService.class), mock(ActionFilters.class));
+
+        // nothing should happen here, since the request doesn't touch any of the immutable state keys
+        var future = new PlainActionFuture<FakeResponse>();
+        handler.doExecute(mock(Task.class), new DummyRequest(), future);
+        assertNotNull(future.actionGet());
+
+        ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
+            Settings.builder().put("a", "a value").build()
+        ).transientSettings(Settings.builder().put("e", "e value").build());
+
+        FakeReservedStateAwareAction action = new FakeReservedStateAwareAction(
+            "internal:testClusterSettings",
+            clusterService,
+            mock(TransportService.class),
+            mock(ActionFilters.class),
+            null
+        );
+
+        assertTrue(expectThrows(IllegalArgumentException.class, () -> action.doExecute(mock(Task.class), request, new ActionListener<>() {
+            @Override
+            public void onResponse(FakeResponse fakeResponse) {
+                fail("Shouldn't reach here");
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                assertNotNull(e);
+            }
+        })).getMessage().contains("with errors: [[a] set as read-only by [namespace_one], " + "[e] set as read-only by [namespace_two]"));
+
+        ClusterUpdateSettingsRequest okRequest = new ClusterUpdateSettingsRequest().persistentSettings(
+            Settings.builder().put("m", "m value").build()
+        ).transientSettings(Settings.builder().put("n", "n value").build());
+
+        // this should just work, no conflicts
+        action.doExecute(mock(Task.class), okRequest, new ActionListener<>() {
+            @Override
+            public void onResponse(FakeResponse fakeResponse) {
+                assertNotNull(fakeResponse);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                fail("Shouldn't reach here");
+            }
+        });
+    }
+
+    static class Action extends ReservedStateAwareHandledTransportAction<DummyRequest, FakeResponse> {
+        protected Action(String actionName, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters) {
+            super(actionName, clusterService, transportService, actionFilters, null);
+        }
+
+        @Override
+        public Optional<String> reservedStateHandlerName() {
+            return Optional.of("test_reserved_state_action");
+        }
+
+        @Override
+        protected void doExecuteProtected(Task task, DummyRequest request, ActionListener<FakeResponse> listener) {
+            listener.onResponse(new FakeResponse());
+        }
+    }
+
+    static class DummyRequest extends ActionRequest {
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+    }
+
+    static class FakeResponse extends ActionResponse {
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {}
+    }
+
+    static class FakeReservedStateAwareAction extends ReservedStateAwareHandledTransportAction<ClusterUpdateSettingsRequest, FakeResponse> {
+        protected FakeReservedStateAwareAction(
+            String actionName,
+            ClusterService clusterService,
+            TransportService transportService,
+            ActionFilters actionFilters,
+            Writeable.Reader<ClusterUpdateSettingsRequest> clusterUpdateSettingsRequestReader
+        ) {
+            super(actionName, clusterService, transportService, actionFilters, clusterUpdateSettingsRequestReader);
+        }
+
+        @Override
+        protected void doExecuteProtected(Task task, ClusterUpdateSettingsRequest request, ActionListener<FakeResponse> listener) {
+            listener.onResponse(new FakeResponse());
+        }
+
+        @Override
+        public Optional<String> reservedStateHandlerName() {
+            return Optional.of(ReservedClusterSettingsAction.NAME);
+        }
+
+        @Override
+        public Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
+            Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
+            return allSettings.keySet();
+        }
+    }
+}

+ 9 - 9
server/src/test/java/org/elasticsearch/action/support/master/TransportMasterNodeActionTests.java

@@ -266,7 +266,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
         }
 
         @Override
-        protected Optional<String> reservedStateHandlerName() {
+        public Optional<String> reservedStateHandlerName() {
             return Optional.of("test_reserved_state_action");
         }
     }
@@ -306,12 +306,12 @@ public class TransportMasterNodeActionTests extends ESTestCase {
         }
 
         @Override
-        protected Optional<String> reservedStateHandlerName() {
+        public Optional<String> reservedStateHandlerName() {
             return Optional.of(ReservedClusterSettingsAction.NAME);
         }
 
         @Override
-        protected Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
+        public Set<String> modifiedKeys(ClusterUpdateSettingsRequest request) {
             Settings allSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
             return allSettings.keySet();
         }
@@ -771,14 +771,14 @@ public class TransportMasterNodeActionTests extends ESTestCase {
 
         Action noHandler = new Action("internal:testAction", transportService, clusterService, threadPool, ThreadPool.Names.SAME);
 
-        assertFalse(noHandler.supportsImmutableState());
+        assertFalse(noHandler.supportsReservedState());
 
         noHandler = new ReservedStateAction("internal:testOpAction", transportService, clusterService, threadPool);
 
-        assertTrue(noHandler.supportsImmutableState());
+        assertTrue(noHandler.supportsReservedState());
 
         // nothing should happen here, since the request doesn't touch any of the immutable state keys
-        noHandler.validateForImmutableState(new Request(), clusterState);
+        noHandler.validateForReservedState(new Request(), clusterState);
 
         ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest().persistentSettings(
             Settings.builder().put("a", "a value").build()
@@ -792,10 +792,10 @@ public class TransportMasterNodeActionTests extends ESTestCase {
             ThreadPool.Names.SAME
         );
 
-        assertTrue(action.supportsImmutableState());
+        assertTrue(action.supportsReservedState());
 
         assertTrue(
-            expectThrows(IllegalArgumentException.class, () -> action.validateForImmutableState(request, clusterState)).getMessage()
+            expectThrows(IllegalArgumentException.class, () -> action.validateForReservedState(request, clusterState)).getMessage()
                 .contains("with errors: [[a] set as read-only by [namespace_one], " + "[e] set as read-only by [namespace_two]")
         );
 
@@ -804,7 +804,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
         ).transientSettings(Settings.builder().put("n", "n value").build());
 
         // this should just work, no conflicts
-        action.validateForImmutableState(okRequest, clusterState);
+        action.validateForReservedState(okRequest, clusterState);
     }
 
     private Runnable blockAllThreads(String executorName) throws Exception {

+ 199 - 2
server/src/test/java/org/elasticsearch/reservedstate/service/ReservedClusterStateServiceTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.Releasable;
+import org.elasticsearch.reservedstate.NonStateTransformResult;
 import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
 import org.elasticsearch.reservedstate.TransformState;
 import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
@@ -33,20 +34,28 @@ import org.elasticsearch.xcontent.XContentParserConfiguration;
 import org.elasticsearch.xcontent.XContentType;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.reservedstate.service.ReservedStateUpdateTask.checkMetadataVersion;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -141,6 +150,7 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
             new ReservedStateUpdateTask(
                 "test",
                 null,
+                List.of(),
                 Collections.emptyMap(),
                 Collections.emptySet(),
                 (clusterState, errorState) -> {},
@@ -321,14 +331,18 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
         assertTrue(ReservedClusterStateService.isNewError(operatorMetadata, 3L));
         assertTrue(ReservedClusterStateService.isNewError(null, 1L));
 
+        var chunk = new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(2L, Version.CURRENT));
+        var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name());
+
         // We submit a task with two handler, one will cause an exception, the other will create a new state.
         // When we fail to update the metadata because of version, we ensure that the returned state is equal to the
         // original state by pointer reference to avoid cluster state update task to run.
         ReservedStateUpdateTask task = new ReservedStateUpdateTask(
             "namespace_one",
-            new ReservedStateChunk(Map.of("one", "two", "maker", "three"), new ReservedStateVersion(2L, Version.CURRENT)),
+            chunk,
+            List.of(),
             Map.of(exceptionThrower.name(), exceptionThrower, newStateMaker.name(), newStateMaker),
-            List.of(exceptionThrower.name(), newStateMaker.name()),
+            orderedHandlers,
             (clusterState, errorState) -> { assertFalse(ReservedClusterStateService.isNewError(operatorMetadata, errorState.version())); },
             new ActionListener<>() {
                 @Override
@@ -339,6 +353,14 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
             }
         );
 
+        ClusterService clusterService = mock(ClusterService.class);
+        final var controller = spy(new ReservedClusterStateService(clusterService, List.of(newStateMaker, exceptionThrower)));
+
+        var trialRunResult = controller.trialRun("namespace_one", state, chunk, new LinkedHashSet<>(orderedHandlers));
+        assertEquals(0, trialRunResult.nonStateTransforms().size());
+        assertEquals(1, trialRunResult.errors().size());
+        assertTrue(trialRunResult.errors().get(0).contains("Error processing one state change:"));
+
         // We exit on duplicate errors before we update the cluster state error metadata
         assertThat(
             expectThrows(IllegalStateException.class, () -> task.execute(state)).getMessage(),
@@ -460,6 +482,181 @@ public class ReservedClusterStateServiceTests extends ESTestCase {
         );
     }
 
+    public void testCheckAndReportError() {
+        ClusterService clusterService = mock(ClusterService.class);
+        final var controller = spy(new ReservedClusterStateService(clusterService, List.of()));
+
+        assertNull(controller.checkAndReportError("test", List.of(), null, null));
+        verify(controller, times(0)).saveErrorState(any(), any());
+
+        var state = ClusterState.builder(new ClusterName("elasticsearch")).build();
+        var version = new ReservedStateVersion(2L, Version.CURRENT);
+        var error = controller.checkAndReportError("test", List.of("test error"), state, version);
+        assertThat(error, allOf(notNullValue(), instanceOf(IllegalStateException.class)));
+        assertEquals("Error processing state change request for test, errors: test error", error.getMessage());
+        verify(controller, times(1)).saveErrorState(any(), any());
+    }
+
+    public void testTrialRunExtractsNonStateActions() {
+        ReservedClusterStateHandler<Map<String, Object>> newStateMaker = new ReservedClusterStateHandler<>() {
+            @Override
+            public String name() {
+                return "maker";
+            }
+
+            @Override
+            public TransformState transform(Object source, TransformState prevState) throws Exception {
+                ClusterState newState = new ClusterState.Builder(prevState.state()).build();
+                return new TransformState(newState, prevState.keys());
+            }
+
+            @Override
+            public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
+                return parser.map();
+            }
+        };
+
+        ReservedClusterStateHandler<Map<String, Object>> exceptionThrower = new ReservedClusterStateHandler<>() {
+            @Override
+            public String name() {
+                return "non-state";
+            }
+
+            @Override
+            public TransformState transform(Object source, TransformState prevState) {
+                return new TransformState(prevState.state(), prevState.keys(), (l) -> internalKeys(l));
+            }
+
+            private void internalKeys(ActionListener<NonStateTransformResult> listener) {
+                listener.onResponse(new NonStateTransformResult(name(), Set.of("key non-state")));
+            }
+
+            @Override
+            public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
+                return parser.map();
+            }
+        };
+
+        ReservedStateHandlerMetadata hmOne = new ReservedStateHandlerMetadata("non-state", Set.of("a", "b"));
+        ReservedStateErrorMetadata emOne = new ReservedStateErrorMetadata(
+            2L,
+            ReservedStateErrorMetadata.ErrorKind.VALIDATION,
+            List.of("Test error 1", "Test error 2")
+        );
+
+        final ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one")
+            .errorMetadata(emOne)
+            .version(1L)
+            .putHandler(hmOne)
+            .build();
+
+        Metadata metadata = Metadata.builder().put(operatorMetadata).build();
+        ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build();
+
+        var chunk = new ReservedStateChunk(Map.of("non-state", "two", "maker", "three"), new ReservedStateVersion(2L, Version.CURRENT));
+        var orderedHandlers = List.of(exceptionThrower.name(), newStateMaker.name());
+
+        ClusterService clusterService = mock(ClusterService.class);
+        final var controller = spy(new ReservedClusterStateService(clusterService, List.of(newStateMaker, exceptionThrower)));
+
+        var trialRunResult = controller.trialRun("namespace_one", state, chunk, new LinkedHashSet<>(orderedHandlers));
+
+        assertEquals(1, trialRunResult.nonStateTransforms().size());
+        assertEquals(0, trialRunResult.errors().size());
+        trialRunResult.nonStateTransforms().get(0).accept(new ActionListener<>() {
+            @Override
+            public void onResponse(NonStateTransformResult nonStateTransformResult) {
+                assertThat(nonStateTransformResult.updatedKeys(), containsInAnyOrder("key non-state"));
+                assertEquals("non-state", nonStateTransformResult.handlerName());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                fail("Should not reach here");
+            }
+        });
+    }
+
+    public void testExecuteNonStateTransformationSteps() {
+        int count = randomInt(10);
+        var handlers = new ArrayList<ReservedClusterStateHandler<?>>();
+        var i = 0;
+        var builder = ReservedStateMetadata.builder("namespace_one").version(1L);
+        var chunkMap = new HashMap<String, Object>();
+
+        while (i < count) {
+            final var key = i++;
+            var handler = new ReservedClusterStateHandler<>() {
+                @Override
+                public String name() {
+                    return "non-state:" + key;
+                }
+
+                @Override
+                public TransformState transform(Object source, TransformState prevState) {
+                    return new TransformState(prevState.state(), prevState.keys(), (l) -> internalKeys(l));
+                }
+
+                private void internalKeys(ActionListener<NonStateTransformResult> listener) {
+                    listener.onResponse(new NonStateTransformResult(name(), Set.of("key non-state:" + key)));
+                }
+
+                @Override
+                public Map<String, Object> fromXContent(XContentParser parser) throws IOException {
+                    return parser.map();
+                }
+            };
+
+            builder.putHandler(new ReservedStateHandlerMetadata(handler.name(), Set.of("a", "b")));
+            handlers.add(handler);
+            chunkMap.put(handler.name(), i);
+        }
+
+        final ReservedStateMetadata operatorMetadata = ReservedStateMetadata.builder("namespace_one").version(1L).build();
+
+        Metadata metadata = Metadata.builder().put(operatorMetadata).build();
+        ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build();
+
+        var chunk = new ReservedStateChunk(chunkMap, new ReservedStateVersion(2L, Version.CURRENT));
+
+        ClusterService clusterService = mock(ClusterService.class);
+        final var controller = spy(new ReservedClusterStateService(clusterService, handlers));
+
+        var trialRunResult = controller.trialRun(
+            "namespace_one",
+            state,
+            chunk,
+            new LinkedHashSet<>(handlers.stream().map(h -> h.name()).toList())
+        );
+
+        assertEquals(count, trialRunResult.nonStateTransforms().size());
+        controller.executeNonStateTransformationSteps(trialRunResult.nonStateTransforms(), new ActionListener<>() {
+            @Override
+            public void onResponse(Collection<NonStateTransformResult> nonStateTransformResults) {
+                assertEquals(count, nonStateTransformResults.size());
+                var expectedHandlers = new ArrayList<String>();
+                var expectedValues = new ArrayList<String>();
+                for (int i = 0; i < count; i++) {
+                    expectedHandlers.add("non-state:" + i);
+                    expectedValues.add("key non-state:" + i);
+                }
+                assertThat(
+                    nonStateTransformResults.stream().map(n -> n.handlerName()).collect(Collectors.toSet()),
+                    containsInAnyOrder(expectedHandlers.toArray(new String[0]))
+                );
+                assertThat(
+                    nonStateTransformResults.stream().map(n -> n.updatedKeys()).flatMap(Set::stream).collect(Collectors.toSet()),
+                    containsInAnyOrder(expectedValues.toArray(new String[0]))
+                );
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                fail("Shouldn't reach here");
+            }
+        });
+    }
+
     class TestHandler implements ReservedClusterStateHandler<Map<String, Object>> {
 
         @Override

+ 2 - 2
x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportDeleteAutoscalingPolicyAction.java

@@ -123,12 +123,12 @@ public class TransportDeleteAutoscalingPolicyAction extends AcknowledgedTranspor
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedAutoscalingPolicyAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(DeleteAutoscalingPolicyAction.Request request) {
+    public Set<String> modifiedKeys(DeleteAutoscalingPolicyAction.Request request) {
         return Set.of(request.name());
     }
 }

+ 2 - 2
x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/action/TransportPutAutoscalingPolicyAction.java

@@ -188,12 +188,12 @@ public class TransportPutAutoscalingPolicyAction extends AcknowledgedTransportMa
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedAutoscalingPolicyAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(PutAutoscalingPolicyAction.Request request) {
+    public Set<String> modifiedKeys(PutAutoscalingPolicyAction.Request request) {
         return Set.of(request.name());
     }
 }

+ 17 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/rolemapping/PutRoleMappingRequest.java

@@ -59,6 +59,10 @@ public class PutRoleMappingRequest extends ActionRequest implements WriteRequest
 
     @Override
     public ActionRequestValidationException validate() {
+        return validate(true);
+    }
+
+    public ActionRequestValidationException validate(boolean validateMetadata) {
         ActionRequestValidationException validationException = null;
         if (name == null) {
             validationException = addValidationError("role-mapping name is missing", validationException);
@@ -72,7 +76,7 @@ public class PutRoleMappingRequest extends ActionRequest implements WriteRequest
         if (rules == null) {
             validationException = addValidationError("role-mapping rules are missing", validationException);
         }
-        if (MetadataUtils.containsReservedMetadata(metadata)) {
+        if (validateMetadata && MetadataUtils.containsReservedMetadata(metadata)) {
             validationException = addValidationError(
                 "metadata keys may not start with [" + MetadataUtils.RESERVED_PREFIX + "]",
                 validationException
@@ -162,4 +166,16 @@ public class PutRoleMappingRequest extends ActionRequest implements WriteRequest
     public ExpressionRoleMapping getMapping() {
         return new ExpressionRoleMapping(name, rules, roles, roleTemplates, metadata, enabled);
     }
+
+    public static PutRoleMappingRequest fromMapping(ExpressionRoleMapping mapping) {
+        var request = new PutRoleMappingRequest();
+        request.setName(mapping.getName());
+        request.setEnabled(mapping.isEnabled());
+        request.setRoles(mapping.getRoles());
+        request.setRoleTemplates(mapping.getRoleTemplates());
+        request.setRules(mapping.getExpression());
+        request.setMetadata(mapping.getMetadata());
+
+        return request;
+    }
 }

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java

@@ -120,12 +120,12 @@ public class TransportDeleteLifecycleAction extends TransportMasterNodeAction<Re
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedLifecycleAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(Request request) {
+    public Set<String> modifiedKeys(Request request) {
         return Set.of(request.getPolicyName());
     }
 }

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java

@@ -311,12 +311,12 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<Reque
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedLifecycleAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(Request request) {
+    public Set<String> modifiedKeys(Request request) {
         return Set.of(request.getPolicy().getName());
     }
 }

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportDeleteSnapshotLifecycleAction.java

@@ -145,12 +145,12 @@ public class TransportDeleteSnapshotLifecycleAction extends TransportMasterNodeA
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedSnapshotAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(DeleteSnapshotLifecycleAction.Request request) {
+    public Set<String> modifiedKeys(DeleteSnapshotLifecycleAction.Request request) {
         return Set.of(request.getLifecycleId());
     }
 }

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/action/TransportPutSnapshotLifecycleAction.java

@@ -179,12 +179,12 @@ public class TransportPutSnapshotLifecycleAction extends TransportMasterNodeActi
     }
 
     @Override
-    protected Optional<String> reservedStateHandlerName() {
+    public Optional<String> reservedStateHandlerName() {
         return Optional.of(ReservedSnapshotAction.NAME);
     }
 
     @Override
-    protected Set<String> modifiedKeys(PutSnapshotLifecycleAction.Request request) {
+    public Set<String> modifiedKeys(PutSnapshotLifecycleAction.Request request) {
         return Set.of(request.getLifecycleId());
     }
 }

+ 440 - 0
x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/RoleMappingFileSettingsIT.java

@@ -0,0 +1,440 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.integration;
+
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
+import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.metadata.ReservedStateErrorMetadata;
+import org.elasticsearch.cluster.metadata.ReservedStateHandlerMetadata;
+import org.elasticsearch.cluster.metadata.ReservedStateMetadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.reservedstate.action.ReservedClusterSettingsAction;
+import org.elasticsearch.reservedstate.service.FileSettingsService;
+import org.elasticsearch.test.NativeRealmIntegTestCase;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsAction;
+import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsRequest;
+import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingAction;
+import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest;
+import org.elasticsearch.xpack.core.security.authc.support.mapper.ExpressionRoleMapping;
+import org.elasticsearch.xpack.security.action.rolemapping.ReservedRoleMappingAction;
+import org.junit.After;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
+import static org.elasticsearch.xcontent.XContentType.JSON;
+import static org.elasticsearch.xpack.core.security.test.TestRestrictedIndices.INTERNAL_SECURITY_MAIN_INDEX_7;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.notNullValue;
+
+/**
+ * Tests that file settings service can properly add role mappings and detect REST clashes
+ * with the reserved role mappings.
+ */
+public class RoleMappingFileSettingsIT extends NativeRealmIntegTestCase {
+
+    private static AtomicLong versionCounter = new AtomicLong(1);
+
+    private static String emptyJSON = """
+        {
+             "metadata": {
+                 "version": "%s",
+                 "compatibility": "8.4.0"
+             },
+             "state": {
+                "cluster_settings": {},
+                "role_mappings": {}
+             }
+        }""";
+
+    private static String testJSON = """
+        {
+             "metadata": {
+                 "version": "%s",
+                 "compatibility": "8.4.0"
+             },
+             "state": {
+                 "cluster_settings": {
+                     "indices.recovery.max_bytes_per_sec": "50mb"
+                 },
+                 "role_mappings": {
+                       "everyone_kibana": {
+                          "enabled": true,
+                          "roles": [ "kibana_user" ],
+                          "rules": { "field": { "username": "*" } },
+                          "metadata": {
+                             "uuid" : "b9a59ba9-6b92-4be2-bb8d-02bb270cb3a7",
+                             "_foo": "something"
+                          }
+                       },
+                       "everyone_fleet": {
+                          "enabled": true,
+                          "roles": [ "fleet_user" ],
+                          "rules": { "field": { "username": "*" } },
+                          "metadata": {
+                             "uuid" : "b9a59ba9-6b92-4be3-bb8d-02bb270cb3a7",
+                             "_foo": "something_else"
+                          }
+                       }
+                 }
+             }
+        }""";
+
+    private static String testErrorJSON = """
+        {
+             "metadata": {
+                 "version": "%s",
+                 "compatibility": "8.4.0"
+             },
+             "state": {
+                 "role_mappings": {
+                       "everyone_fleet_ok": {
+                          "enabled": true,
+                          "roles": [ "fleet_user" ],
+                          "rules": { "field": { "username": "*" } },
+                          "metadata": {
+                             "uuid" : "b9a59ba9-6b92-4be3-bb8d-02bb270cb3a7"
+                          }
+                       },
+                       "everyone_kibana_bad": {
+                          "enabled": true,
+                          "roles": [ "kibana_user" ],
+                          "metadata": {
+                             "uuid" : "b9a59ba9-6b92-4be2-bb8d-02bb270cb3a7"
+                          }
+                       }
+                 }
+             }
+        }""";
+
+    @After
+    public void cleanUp() throws IOException {
+        var fileSettingsService = internalCluster().getInstance(FileSettingsService.class, internalCluster().getMasterName());
+        Files.deleteIfExists(fileSettingsService.operatorSettingsFile());
+
+        ClusterUpdateSettingsResponse settingsResponse = client().admin()
+            .cluster()
+            .prepareUpdateSettings()
+            .setPersistentSettings(Settings.builder().putNull("indices.recovery.max_bytes_per_sec"))
+            .get();
+        assertTrue(settingsResponse.isAcknowledged());
+    }
+
+    private void writeJSONFile(String node, String json) throws Exception {
+        long version = versionCounter.incrementAndGet();
+
+        FileSettingsService fileSettingsService = internalCluster().getInstance(FileSettingsService.class, node);
+        assertTrue(fileSettingsService.watching());
+
+        Files.deleteIfExists(fileSettingsService.operatorSettingsFile());
+
+        Files.createDirectories(fileSettingsService.operatorSettingsDir());
+        Path tempFilePath = createTempFile();
+
+        logger.info("--> writing JSON config to node {} with path {}", node, tempFilePath);
+        logger.info(Strings.format(json, version));
+        Files.write(tempFilePath, Strings.format(json, version).getBytes(StandardCharsets.UTF_8));
+        Files.move(tempFilePath, fileSettingsService.operatorSettingsFile(), StandardCopyOption.ATOMIC_MOVE);
+    }
+
+    private Tuple<CountDownLatch, AtomicLong> setupClusterStateListener(String node) {
+        ClusterService clusterService = internalCluster().clusterService(node);
+        CountDownLatch savedClusterState = new CountDownLatch(1);
+        AtomicLong metadataVersion = new AtomicLong(-1);
+        clusterService.addListener(new ClusterStateListener() {
+            @Override
+            public void clusterChanged(ClusterChangedEvent event) {
+                ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
+                if (reservedState != null) {
+                    ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedRoleMappingAction.NAME);
+                    if (handlerMetadata != null && handlerMetadata.keys().contains("everyone_kibana")) {
+                        clusterService.removeListener(this);
+                        metadataVersion.set(event.state().metadata().version());
+                        savedClusterState.countDown();
+                    }
+                }
+            }
+        });
+
+        return new Tuple<>(savedClusterState, metadataVersion);
+    }
+
+    private Tuple<CountDownLatch, AtomicLong> setupClusterStateListenerForCleanup(String node) {
+        ClusterService clusterService = internalCluster().clusterService(node);
+        CountDownLatch savedClusterState = new CountDownLatch(1);
+        AtomicLong metadataVersion = new AtomicLong(-1);
+        clusterService.addListener(new ClusterStateListener() {
+            @Override
+            public void clusterChanged(ClusterChangedEvent event) {
+                ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
+                if (reservedState != null) {
+                    ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedClusterSettingsAction.NAME);
+                    if (handlerMetadata == null || handlerMetadata.keys().isEmpty()) {
+                        clusterService.removeListener(this);
+                        metadataVersion.set(event.state().metadata().version());
+                        savedClusterState.countDown();
+                    }
+                }
+            }
+        });
+
+        return new Tuple<>(savedClusterState, metadataVersion);
+    }
+
+    private void assertRoleMappingsSaveOK(CountDownLatch savedClusterState, AtomicLong metadataVersion) throws Exception {
+        boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
+        assertTrue(awaitSuccessful);
+
+        final ClusterStateResponse clusterStateResponse = client().admin()
+            .cluster()
+            .state(new ClusterStateRequest().waitForMetadataVersion(metadataVersion.get()))
+            .get();
+
+        ReservedStateMetadata reservedState = clusterStateResponse.getState()
+            .metadata()
+            .reservedStateMetadata()
+            .get(FileSettingsService.NAMESPACE);
+
+        ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedRoleMappingAction.NAME);
+        assertThat(handlerMetadata.keys(), allOf(notNullValue(), containsInAnyOrder("everyone_kibana", "everyone_fleet")));
+
+        ReservedStateHandlerMetadata clusterStateHandlerMetadata = reservedState.handlers().get(ReservedClusterSettingsAction.NAME);
+        assertThat(
+            clusterStateHandlerMetadata.keys(),
+            allOf(notNullValue(), containsInAnyOrder(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()))
+        );
+
+        assertThat(
+            clusterStateResponse.getState().metadata().persistentSettings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey()),
+            equalTo("50mb")
+        );
+
+        ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings(
+            Settings.builder().put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "1234kb")
+        );
+        assertEquals(
+            "java.lang.IllegalArgumentException: Failed to process request "
+                + "[org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest/unset] "
+                + "with errors: [[indices.recovery.max_bytes_per_sec] set as read-only by [file_settings]]",
+            expectThrows(ExecutionException.class, () -> client().admin().cluster().updateSettings(req).get()).getMessage()
+        );
+
+        var request = new GetRoleMappingsRequest();
+        request.setNames("everyone_kibana", "everyone_fleet");
+        var response = client().execute(GetRoleMappingsAction.INSTANCE, request).get();
+        assertTrue(response.hasMappings());
+        assertThat(
+            Arrays.stream(response.mappings()).map(r -> r.getName()).collect(Collectors.toSet()),
+            allOf(notNullValue(), containsInAnyOrder("everyone_kibana", "everyone_fleet"))
+        );
+
+        // Try using the REST API to update the everyone_kibana role mapping
+        // This should fail, we have reserved certain role mappings in operator mode
+        assertEquals(
+            "Failed to process request "
+                + "[org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest/unset] "
+                + "with errors: [[everyone_kibana] set as read-only by [file_settings]]",
+            expectThrows(
+                IllegalArgumentException.class,
+                () -> client().execute(PutRoleMappingAction.INSTANCE, sampleRestRequest("everyone_kibana")).actionGet()
+            ).getMessage()
+        );
+        assertEquals(
+            "Failed to process request "
+                + "[org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest/unset] "
+                + "with errors: [[everyone_fleet] set as read-only by [file_settings]]",
+            expectThrows(
+                IllegalArgumentException.class,
+                () -> client().execute(PutRoleMappingAction.INSTANCE, sampleRestRequest("everyone_fleet")).actionGet()
+            ).getMessage()
+        );
+    }
+
+    public void testRoleMappingsApplied() throws Exception {
+        ensureGreen();
+
+        var savedClusterState = setupClusterStateListener(internalCluster().getMasterName());
+        writeJSONFile(internalCluster().getMasterName(), testJSON);
+
+        assertRoleMappingsSaveOK(savedClusterState.v1(), savedClusterState.v2());
+        logger.info("---> cleanup cluster settings...");
+
+        savedClusterState = setupClusterStateListenerForCleanup(internalCluster().getMasterName());
+
+        writeJSONFile(internalCluster().getMasterName(), emptyJSON);
+        boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);
+        assertTrue(awaitSuccessful);
+
+        final ClusterStateResponse clusterStateResponse = client().admin()
+            .cluster()
+            .state(new ClusterStateRequest().waitForMetadataVersion(savedClusterState.v2().get()))
+            .get();
+
+        assertNull(
+            clusterStateResponse.getState().metadata().persistentSettings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey())
+        );
+
+        var request = new GetRoleMappingsRequest();
+        request.setNames("everyone_kibana", "everyone_fleet");
+        var response = client().execute(GetRoleMappingsAction.INSTANCE, request).get();
+        assertFalse(response.hasMappings());
+    }
+
+    private Tuple<CountDownLatch, AtomicLong> setupClusterStateListenerForError(String node) {
+        ClusterService clusterService = internalCluster().clusterService(node);
+        CountDownLatch savedClusterState = new CountDownLatch(1);
+        AtomicLong metadataVersion = new AtomicLong(-1);
+        clusterService.addListener(new ClusterStateListener() {
+            @Override
+            public void clusterChanged(ClusterChangedEvent event) {
+                ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
+                if (reservedState != null
+                    && reservedState.errorMetadata() != null
+                    && reservedState.errorMetadata().errorKind() == ReservedStateErrorMetadata.ErrorKind.PARSING) {
+                    clusterService.removeListener(this);
+                    metadataVersion.set(event.state().metadata().version());
+                    savedClusterState.countDown();
+                    assertEquals(ReservedStateErrorMetadata.ErrorKind.PARSING, reservedState.errorMetadata().errorKind());
+                    assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1)));
+                    assertThat(
+                        reservedState.errorMetadata().errors().get(0),
+                        containsString("failed to parse role-mapping [everyone_kibana_bad]. missing field [rules]")
+                    );
+                }
+            }
+        });
+
+        return new Tuple<>(savedClusterState, metadataVersion);
+    }
+
+    private void assertRoleMappingsNotSaved(CountDownLatch savedClusterState, AtomicLong metadataVersion) throws Exception {
+        boolean awaitSuccessful = savedClusterState.await(20, TimeUnit.SECONDS);
+        assertTrue(awaitSuccessful);
+
+        // This should succeed, nothing was reserved
+        client().execute(PutRoleMappingAction.INSTANCE, sampleRestRequest("everyone_kibana_bad")).get();
+        client().execute(PutRoleMappingAction.INSTANCE, sampleRestRequest("everyone_fleet_ok")).get();
+    }
+
+    public void testErrorSaved() throws Exception {
+        ensureGreen();
+
+        var savedClusterState = setupClusterStateListenerForError(internalCluster().getMasterName());
+
+        writeJSONFile(internalCluster().getMasterName(), testErrorJSON);
+        assertRoleMappingsNotSaved(savedClusterState.v1(), savedClusterState.v2());
+    }
+
+    private Tuple<CountDownLatch, AtomicLong> setupClusterStateListenerForSecurityWriteError(String node) {
+        ClusterService clusterService = internalCluster().clusterService(node);
+        CountDownLatch savedClusterState = new CountDownLatch(1);
+        AtomicLong metadataVersion = new AtomicLong(-1);
+        clusterService.addListener(new ClusterStateListener() {
+            @Override
+            public void clusterChanged(ClusterChangedEvent event) {
+                ReservedStateMetadata reservedState = event.state().metadata().reservedStateMetadata().get(FileSettingsService.NAMESPACE);
+                if (reservedState != null
+                    && reservedState.errorMetadata() != null
+                    && reservedState.errorMetadata().errorKind() == ReservedStateErrorMetadata.ErrorKind.VALIDATION) {
+                    clusterService.removeListener(this);
+                    metadataVersion.set(event.state().metadata().version());
+                    savedClusterState.countDown();
+                    assertEquals(ReservedStateErrorMetadata.ErrorKind.VALIDATION, reservedState.errorMetadata().errorKind());
+                    assertThat(reservedState.errorMetadata().errors(), allOf(notNullValue(), hasSize(1)));
+                    assertThat(reservedState.errorMetadata().errors().get(0), containsString("closed"));
+                }
+            }
+        });
+
+        return new Tuple<>(savedClusterState, metadataVersion);
+    }
+
+    public void testRoleMappingFailsToWriteToStore() throws Exception {
+        ensureGreen();
+
+        var savedClusterState = setupClusterStateListenerForSecurityWriteError(internalCluster().getMasterName());
+
+        final CloseIndexResponse closeIndexResponse = client().admin()
+            .indices()
+            .close(new CloseIndexRequest(INTERNAL_SECURITY_MAIN_INDEX_7))
+            .get();
+        assertTrue(closeIndexResponse.isAcknowledged());
+
+        writeJSONFile(internalCluster().getMasterName(), testJSON);
+        boolean awaitSuccessful = savedClusterState.v1().await(20, TimeUnit.SECONDS);
+        assertTrue(awaitSuccessful);
+
+        var request = new GetRoleMappingsRequest();
+        request.setNames("everyone_kibana", "everyone_fleet");
+
+        var response = client().execute(GetRoleMappingsAction.INSTANCE, request).get();
+        assertFalse(response.hasMappings());
+
+        final ClusterStateResponse clusterStateResponse = client().admin()
+            .cluster()
+            .state(new ClusterStateRequest().waitForMetadataVersion(savedClusterState.v2().get()))
+            .get();
+
+        assertNull(
+            clusterStateResponse.getState().metadata().persistentSettings().get(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey())
+        );
+
+        ReservedStateMetadata reservedState = clusterStateResponse.getState()
+            .metadata()
+            .reservedStateMetadata()
+            .get(FileSettingsService.NAMESPACE);
+
+        ReservedStateHandlerMetadata handlerMetadata = reservedState.handlers().get(ReservedRoleMappingAction.NAME);
+        assertTrue(handlerMetadata == null || handlerMetadata.keys().isEmpty());
+    }
+
+    private PutRoleMappingRequest sampleRestRequest(String name) throws Exception {
+        var json = """
+            {
+                "enabled": false,
+                "roles": [ "kibana_user" ],
+                "rules": { "field": { "username": "*" } },
+                "metadata": {
+                    "uuid" : "b9a59ba9-6b92-4be2-bb8d-02bb270cb3a7"
+                }
+            }""";
+
+        try (
+            var bis = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
+            var parser = JSON.xContent().createParser(XContentParserConfiguration.EMPTY, bis)
+        ) {
+            ExpressionRoleMapping mapping = ExpressionRoleMapping.parse(name, parser);
+            return PutRoleMappingRequest.fromMapping(mapping);
+        }
+    }
+}

+ 4 - 0
x-pack/plugin/security/src/main/java/module-info.java

@@ -74,4 +74,8 @@ module org.elasticsearch.security {
             org.elasticsearch.xpack.security.crypto.tool.SystemKeyToolProvider,
             org.elasticsearch.xpack.security.authc.file.tool.UsersToolProvider,
             org.elasticsearch.xpack.security.enrollment.tool.AutoConfigGenerateElasticPasswordHashToolProvider;
+
+    provides org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider
+        with
+            org.elasticsearch.xpack.security.ReservedSecurityStateHandlerProvider;
 }

+ 33 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/ReservedSecurityStateHandlerProvider.java

@@ -0,0 +1,33 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security;
+
+import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
+import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
+
+import java.util.Collection;
+
+/**
+ * Security Provider implementation for the {@link ReservedClusterStateHandlerProvider} service interface
+ */
+public class ReservedSecurityStateHandlerProvider implements ReservedClusterStateHandlerProvider {
+    private final Security plugin;
+
+    public ReservedSecurityStateHandlerProvider() {
+        throw new IllegalStateException("Provider must be constructed using PluginsService");
+    }
+
+    public ReservedSecurityStateHandlerProvider(Security plugin) {
+        this.plugin = plugin;
+    }
+
+    @Override
+    public Collection<ReservedClusterStateHandler<?>> handlers() {
+        return plugin.reservedClusterStateHandlers();
+    }
+}

+ 14 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java

@@ -68,6 +68,7 @@ import org.elasticsearch.plugins.SearchPlugin;
 import org.elasticsearch.plugins.SystemIndexPlugin;
 import org.elasticsearch.plugins.interceptor.RestInterceptorActionPlugin;
 import org.elasticsearch.repositories.RepositoriesService;
+import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.rest.RestHeaderDefinition;
@@ -207,6 +208,7 @@ import org.elasticsearch.xpack.security.action.role.TransportClearRolesCacheActi
 import org.elasticsearch.xpack.security.action.role.TransportDeleteRoleAction;
 import org.elasticsearch.xpack.security.action.role.TransportGetRolesAction;
 import org.elasticsearch.xpack.security.action.role.TransportPutRoleAction;
+import org.elasticsearch.xpack.security.action.rolemapping.ReservedRoleMappingAction;
 import org.elasticsearch.xpack.security.action.rolemapping.TransportDeleteRoleMappingAction;
 import org.elasticsearch.xpack.security.action.rolemapping.TransportGetRoleMappingsAction;
 import org.elasticsearch.xpack.security.action.rolemapping.TransportPutRoleMappingAction;
@@ -501,6 +503,8 @@ public class Security extends Plugin
     private final SetOnce<Transport> transportReference = new SetOnce<>();
     private final SetOnce<ScriptService> scriptServiceReference = new SetOnce<>();
 
+    private final SetOnce<ReservedRoleMappingAction> reservedRoleMappingAction = new SetOnce<>();
+
     public Security(Settings settings) {
         this(settings, Collections.emptyList());
     }
@@ -919,6 +923,8 @@ public class Security extends Plugin
 
         components.add(new SecurityUsageServices(realms, allRolesStore, nativeRoleMappingStore, ipFilter.get(), profileService));
 
+        reservedRoleMappingAction.set(new ReservedRoleMappingAction(nativeRoleMappingStore));
+
         cacheInvalidatorRegistry.validate();
 
         return components;
@@ -1706,4 +1712,12 @@ public class Security extends Plugin
         }
         return new DlsFlsRequestCacheDifferentiator(getLicenseState(), securityContext, scriptServiceReference);
     }
+
+    List<ReservedClusterStateHandler<?>> reservedClusterStateHandlers() {
+        // If security is disabled we never call the plugin createComponents
+        if (enabled == false) {
+            return Collections.emptyList();
+        }
+        return List.of(reservedRoleMappingAction.get());
+    }
 }

+ 147 - 0
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/ReservedRoleMappingAction.java

@@ -0,0 +1,147 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.action.rolemapping;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.GroupedActionListener;
+import org.elasticsearch.reservedstate.NonStateTransformResult;
+import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
+import org.elasticsearch.reservedstate.TransformState;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingRequest;
+import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest;
+import org.elasticsearch.xpack.core.security.authc.support.mapper.ExpressionRoleMapping;
+import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.elasticsearch.common.xcontent.XContentHelper.mapToXContentParser;
+
+/**
+ * This Action is the reserved state save version of RestPutRoleMappingAction/RestDeleteRoleMappingAction
+ * <p>
+ * It is used by the ReservedClusterStateService to add/update or remove role mappings. Typical usage
+ * for this action is in the context of file based settings.
+ */
+public class ReservedRoleMappingAction implements ReservedClusterStateHandler<List<ExpressionRoleMapping>> {
+    public static final String NAME = "role_mappings";
+
+    private final NativeRoleMappingStore roleMappingStore;
+
+    /**
+     * Creates a ReservedRoleMappingAction
+     *
+     * @param roleMappingStore requires {@link NativeRoleMappingStore} for storing/deleting the mappings
+     */
+    public ReservedRoleMappingAction(NativeRoleMappingStore roleMappingStore) {
+        this.roleMappingStore = roleMappingStore;
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    private Collection<PutRoleMappingRequest> prepare(List<ExpressionRoleMapping> roleMappings) {
+        List<PutRoleMappingRequest> requests = roleMappings.stream().map(rm -> PutRoleMappingRequest.fromMapping(rm)).toList();
+
+        var exceptions = new ArrayList<Exception>();
+        for (var request : requests) {
+            // File based defined role mappings are allowed to use MetadataUtils.RESERVED_PREFIX
+            var exception = request.validate(false);
+            if (exception != null) {
+                exceptions.add(exception);
+            }
+        }
+
+        if (exceptions.isEmpty() == false) {
+            var illegalArgumentException = new IllegalArgumentException("error on validating put role mapping requests");
+            exceptions.forEach(illegalArgumentException::addSuppressed);
+            throw illegalArgumentException;
+        }
+
+        return requests;
+    }
+
+    @Override
+    public TransformState transform(Object source, TransformState prevState) throws Exception {
+        // We execute the prepare() call to catch any errors in the transform phase.
+        // Since we store the role mappings outside the cluster state, we do the actual save with a
+        // non cluster state transform call.
+        @SuppressWarnings("unchecked")
+        var requests = prepare((List<ExpressionRoleMapping>) source);
+        return new TransformState(prevState.state(), prevState.keys(), l -> nonStateTransform(requests, prevState, l));
+    }
+
+    private void nonStateTransform(
+        Collection<PutRoleMappingRequest> requests,
+        TransformState prevState,
+        ActionListener<NonStateTransformResult> listener
+    ) {
+        Set<String> entities = requests.stream().map(r -> r.getName()).collect(Collectors.toSet());
+        Set<String> toDelete = new HashSet<>(prevState.keys());
+        toDelete.removeAll(entities);
+
+        final int tasksCount = requests.size() + toDelete.size();
+
+        // Nothing to do, don't start a group listener with 0 actions
+        if (tasksCount == 0) {
+            listener.onResponse(new NonStateTransformResult(ReservedRoleMappingAction.NAME, Set.of()));
+            return;
+        }
+
+        GroupedActionListener<Boolean> taskListener = new GroupedActionListener<>(new ActionListener<>() {
+            @Override
+            public void onResponse(Collection<Boolean> booleans) {
+                listener.onResponse(new NonStateTransformResult(ReservedRoleMappingAction.NAME, Collections.unmodifiableSet(entities)));
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+        }, tasksCount);
+
+        for (var request : requests) {
+            roleMappingStore.putRoleMapping(request, taskListener);
+        }
+
+        for (var mappingToDelete : toDelete) {
+            var deleteRequest = new DeleteRoleMappingRequest();
+            deleteRequest.setName(mappingToDelete);
+            roleMappingStore.deleteRoleMapping(deleteRequest, taskListener);
+        }
+    }
+
+    @Override
+    public List<ExpressionRoleMapping> fromXContent(XContentParser parser) throws IOException {
+        List<ExpressionRoleMapping> result = new ArrayList<>();
+
+        Map<String, ?> source = parser.map();
+
+        for (String name : source.keySet()) {
+            @SuppressWarnings("unchecked")
+            Map<String, ?> content = (Map<String, ?>) source.get(name);
+            try (XContentParser mappingParser = mapToXContentParser(XContentParserConfiguration.EMPTY, content)) {
+                ExpressionRoleMapping mapping = ExpressionRoleMapping.parse(name, mappingParser);
+                result.add(mapping);
+            }
+        }
+
+        return result;
+    }
+}

+ 21 - 4
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingAction.java

@@ -8,7 +8,8 @@ package org.elasticsearch.xpack.security.action.rolemapping;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.action.support.ReservedStateAwareHandledTransportAction;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
@@ -17,7 +18,12 @@ import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappin
 import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingResponse;
 import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore;
 
-public class TransportDeleteRoleMappingAction extends HandledTransportAction<DeleteRoleMappingRequest, DeleteRoleMappingResponse> {
+import java.util.Optional;
+import java.util.Set;
+
+public class TransportDeleteRoleMappingAction extends ReservedStateAwareHandledTransportAction<
+    DeleteRoleMappingRequest,
+    DeleteRoleMappingResponse> {
 
     private final NativeRoleMappingStore roleMappingStore;
 
@@ -25,17 +31,28 @@ public class TransportDeleteRoleMappingAction extends HandledTransportAction<Del
     public TransportDeleteRoleMappingAction(
         ActionFilters actionFilters,
         TransportService transportService,
+        ClusterService clusterService,
         NativeRoleMappingStore roleMappingStore
     ) {
-        super(DeleteRoleMappingAction.NAME, transportService, actionFilters, DeleteRoleMappingRequest::new);
+        super(DeleteRoleMappingAction.NAME, clusterService, transportService, actionFilters, DeleteRoleMappingRequest::new);
         this.roleMappingStore = roleMappingStore;
     }
 
     @Override
-    protected void doExecute(Task task, DeleteRoleMappingRequest request, ActionListener<DeleteRoleMappingResponse> listener) {
+    protected void doExecuteProtected(Task task, DeleteRoleMappingRequest request, ActionListener<DeleteRoleMappingResponse> listener) {
         roleMappingStore.deleteRoleMapping(
             request,
             listener.delegateFailure((l, found) -> l.onResponse(new DeleteRoleMappingResponse(found)))
         );
     }
+
+    @Override
+    public Optional<String> reservedStateHandlerName() {
+        return Optional.of(ReservedRoleMappingAction.NAME);
+    }
+
+    @Override
+    public Set<String> modifiedKeys(DeleteRoleMappingRequest request) {
+        return Set.of(request.getName());
+    }
 }

+ 23 - 4
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingAction.java

@@ -8,7 +8,8 @@ package org.elasticsearch.xpack.security.action.rolemapping;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.action.support.ReservedStateAwareHandledTransportAction;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
@@ -17,7 +18,10 @@ import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRe
 import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingResponse;
 import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore;
 
-public class TransportPutRoleMappingAction extends HandledTransportAction<PutRoleMappingRequest, PutRoleMappingResponse> {
+import java.util.Optional;
+import java.util.Set;
+
+public class TransportPutRoleMappingAction extends ReservedStateAwareHandledTransportAction<PutRoleMappingRequest, PutRoleMappingResponse> {
 
     private final NativeRoleMappingStore roleMappingStore;
 
@@ -25,17 +29,32 @@ public class TransportPutRoleMappingAction extends HandledTransportAction<PutRol
     public TransportPutRoleMappingAction(
         ActionFilters actionFilters,
         TransportService transportService,
+        ClusterService clusterService,
         NativeRoleMappingStore roleMappingStore
     ) {
-        super(PutRoleMappingAction.NAME, transportService, actionFilters, PutRoleMappingRequest::new);
+        super(PutRoleMappingAction.NAME, clusterService, transportService, actionFilters, PutRoleMappingRequest::new);
         this.roleMappingStore = roleMappingStore;
     }
 
     @Override
-    protected void doExecute(Task task, final PutRoleMappingRequest request, final ActionListener<PutRoleMappingResponse> listener) {
+    protected void doExecuteProtected(
+        Task task,
+        final PutRoleMappingRequest request,
+        final ActionListener<PutRoleMappingResponse> listener
+    ) {
         roleMappingStore.putRoleMapping(
             request,
             ActionListener.wrap(created -> listener.onResponse(new PutRoleMappingResponse(created)), listener::onFailure)
         );
     }
+
+    @Override
+    public Optional<String> reservedStateHandlerName() {
+        return Optional.of(ReservedRoleMappingAction.NAME);
+    }
+
+    @Override
+    public Set<String> modifiedKeys(PutRoleMappingRequest request) {
+        return Set.of(request.getName());
+    }
 }

+ 8 - 0
x-pack/plugin/security/src/main/resources/META-INF/services/org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider

@@ -0,0 +1,8 @@
+#
+# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+# or more contributor license agreements. Licensed under the Elastic License
+# 2.0; you may not use this file except in compliance with the Elastic License
+# 2.0.
+#
+
+org.elasticsearch.xpack.security.ReservedSecurityStateHandlerProvider

+ 55 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalReservedSecurityStateHandlerProvider.java

@@ -0,0 +1,55 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security;
+
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
+import org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+
+/**
+ * Mock Security Provider implementation for the {@link ReservedClusterStateHandlerProvider} service interface. This is used
+ * for {@link org.elasticsearch.test.ESIntegTestCase} because the Security Plugin is really LocalStateSecurity in those tests.
+ */
+public class LocalReservedSecurityStateHandlerProvider implements ReservedClusterStateHandlerProvider {
+    private final LocalStateSecurity plugin;
+
+    public LocalReservedSecurityStateHandlerProvider() {
+        throw new IllegalStateException("Provider must be constructed using PluginsService");
+    }
+
+    public LocalReservedSecurityStateHandlerProvider(LocalStateSecurity plugin) {
+        this.plugin = plugin;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        LocalReservedSecurityStateHandlerProvider that = (LocalReservedSecurityStateHandlerProvider) o;
+        return plugin.equals(that.plugin);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(plugin);
+    }
+
+    @Override
+    public Collection<ReservedClusterStateHandler<?>> handlers() {
+        for (Plugin subPlugin : plugin.plugins()) {
+            if (subPlugin instanceof Security security) {
+                return security.reservedClusterStateHandlers();
+            }
+        }
+        return Collections.emptyList();
+    }
+}

+ 5 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/LocalStateSecurity.java

@@ -15,6 +15,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.license.LicenseService;
 import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.protocol.xpack.XPackInfoRequest;
 import org.elasticsearch.protocol.xpack.XPackInfoResponse;
 import org.elasticsearch.protocol.xpack.XPackUsageRequest;
@@ -124,4 +125,8 @@ public class LocalStateSecurity extends LocalStateCompositeXPackPlugin {
     protected Class<? extends TransportAction<XPackInfoRequest, XPackInfoResponse>> getInfoAction() {
         return SecurityTransportXPackInfoAction.class;
     }
+
+    List<Plugin> plugins() {
+        return plugins;
+    }
 }

+ 245 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/reservedstate/ReservedRoleMappingActionTests.java

@@ -0,0 +1,245 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.action.reservedstate;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.common.ParsingException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.reservedstate.NonStateTransformResult;
+import org.elasticsearch.reservedstate.TransformState;
+import org.elasticsearch.script.ScriptService;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.XContentType;
+import org.elasticsearch.xpack.security.action.rolemapping.ReservedRoleMappingAction;
+import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore;
+import org.elasticsearch.xpack.security.support.SecurityIndexManager;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Tests that the ReservedRoleMappingAction does validation, can add and remove role mappings
+ */
+public class ReservedRoleMappingActionTests extends ESTestCase {
+    private TransformState processJSON(ReservedRoleMappingAction action, TransformState prevState, String json) throws Exception {
+        try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) {
+            var content = action.fromXContent(parser);
+            var state = action.transform(content, prevState);
+
+            CountDownLatch latch = new CountDownLatch(1);
+            AtomicReference<Set<String>> updatedKeys = new AtomicReference<>();
+            AtomicReference<Exception> error = new AtomicReference<>();
+            state.nonStateTransform().accept(new ActionListener<>() {
+                @Override
+                public void onResponse(NonStateTransformResult nonStateTransformResult) {
+                    updatedKeys.set(nonStateTransformResult.updatedKeys());
+                    latch.countDown();
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    error.set(e);
+                    latch.countDown();
+                }
+            });
+
+            latch.await();
+            if (error.get() != null) {
+                throw error.get();
+            }
+            return new TransformState(state.state(), updatedKeys.get());
+        }
+    }
+
+    public void testValidation() {
+        var nativeRoleMappingStore = mockNativeRoleMappingStore();
+
+        ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build();
+        TransformState prevState = new TransformState(state, Collections.emptySet());
+        ReservedRoleMappingAction action = new ReservedRoleMappingAction(nativeRoleMappingStore);
+
+        String badPolicyJSON = """
+            {
+               "everyone_kibana": {
+                  "enabled": true,
+                  "roles": [ "inter_planetary_role" ],
+                  "rules": { "field": { "username": "*" } },
+                  "metadata": {
+                     "uuid" : "b9a59ba9-6b92-4be2-bb8d-02bb270cb3a7",
+                     "_reserved": true
+                  }
+               },
+               "everyone_fleet": {
+                  "enabled": true,
+                  "roles": [ "fleet_user" ],
+                  "metadata": {
+                     "uuid" : "b9a59ba9-6b92-4be3-bb8d-02bb270cb3a7"
+                  }
+               }
+            }""";
+
+        assertEquals(
+            "failed to parse role-mapping [everyone_fleet]. missing field [rules]",
+            expectThrows(ParsingException.class, () -> processJSON(action, prevState, badPolicyJSON)).getMessage()
+        );
+    }
+
+    public void testAddRemoveRoleMapping() throws Exception {
+        var nativeRoleMappingStore = mockNativeRoleMappingStore();
+
+        ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build();
+        TransformState prevState = new TransformState(state, Collections.emptySet());
+        ReservedRoleMappingAction action = new ReservedRoleMappingAction(nativeRoleMappingStore);
+
+        String emptyJSON = "";
+
+        TransformState updatedState = processJSON(action, prevState, emptyJSON);
+        assertEquals(0, updatedState.keys().size());
+        assertEquals(prevState.state(), updatedState.state());
+
+        String json = """
+            {
+               "everyone_kibana": {
+                  "enabled": true,
+                  "roles": [ "kibana_user" ],
+                  "rules": { "field": { "username": "*" } },
+                  "metadata": {
+                     "uuid" : "b9a59ba9-6b92-4be2-bb8d-02bb270cb3a7",
+                     "_reserved": true
+                  }
+               },
+               "everyone_fleet": {
+                  "enabled": true,
+                  "roles": [ "fleet_user" ],
+                  "rules": { "field": { "username": "*" } },
+                  "metadata": {
+                     "uuid" : "b9a59ba9-6b92-4be3-bb8d-02bb270cb3a7",
+                     "_reserved": true
+                  }
+               }
+            }""";
+
+        prevState = updatedState;
+        updatedState = processJSON(action, prevState, json);
+        assertThat(updatedState.keys(), containsInAnyOrder("everyone_kibana", "everyone_fleet"));
+
+        updatedState = processJSON(action, prevState, emptyJSON);
+        assertThat(updatedState.keys(), empty());
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testNonStateTransformWaitsOnAsyncActions() throws Exception {
+        var nativeRoleMappingStore = mockNativeRoleMappingStore();
+
+        doAnswer(invocation -> {
+            new Thread(() -> {
+                // Simulate put role mapping async action taking a while
+                try {
+                    Thread.sleep(1_000);
+                    ((ActionListener<Boolean>) invocation.getArgument(1)).onFailure(new IllegalStateException("err_done"));
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }).start();
+
+            return null;
+        }).when(nativeRoleMappingStore).putRoleMapping(any(), any());
+
+        doAnswer(invocation -> {
+            new Thread(() -> {
+                // Simulate delete role mapping async action taking a while
+                try {
+                    Thread.sleep(1_000);
+                    ((ActionListener<Boolean>) invocation.getArgument(1)).onFailure(new IllegalStateException("err_done"));
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }).start();
+
+            return null;
+        }).when(nativeRoleMappingStore).deleteRoleMapping(any(), any());
+
+        ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).build();
+        TransformState updatedState = new TransformState(state, Collections.emptySet());
+        ReservedRoleMappingAction action = new ReservedRoleMappingAction(nativeRoleMappingStore);
+
+        String json = """
+            {
+               "everyone_kibana": {
+                  "enabled": true,
+                  "roles": [ "kibana_user" ],
+                  "rules": { "field": { "username": "*" } },
+                  "metadata": {
+                     "uuid" : "b9a59ba9-6b92-4be2-bb8d-02bb270cb3a7",
+                     "_reserved": true
+                  }
+               },
+               "everyone_fleet": {
+                  "enabled": true,
+                  "roles": [ "fleet_user" ],
+                  "rules": { "field": { "username": "*" } },
+                  "metadata": {
+                     "uuid" : "a9a59ba9-6b92-4be2-bb8d-02bb270cb3a7",
+                     "_reserved": true
+                  }
+               }
+            }""";
+
+        assertEquals(
+            "err_done",
+            expectThrows(IllegalStateException.class, () -> processJSON(action, new TransformState(state, Collections.emptySet()), json))
+                .getMessage()
+        );
+
+        // Now that we've tested that we wait on putRoleMapping correctly, let it finish without exception, so we can test error on delete
+        doAnswer(invocation -> {
+            ((ActionListener<Boolean>) invocation.getArgument(1)).onResponse(true);
+            return null;
+        }).when(nativeRoleMappingStore).putRoleMapping(any(), any());
+
+        updatedState = processJSON(action, updatedState, json);
+        assertThat(updatedState.keys(), containsInAnyOrder("everyone_kibana", "everyone_fleet"));
+
+        final TransformState currentState = new TransformState(updatedState.state(), updatedState.keys());
+
+        assertEquals("err_done", expectThrows(IllegalStateException.class, () -> processJSON(action, currentState, "")).getMessage());
+    }
+
+    @SuppressWarnings("unchecked")
+    private NativeRoleMappingStore mockNativeRoleMappingStore() {
+        final NativeRoleMappingStore nativeRoleMappingStore = spy(
+            new NativeRoleMappingStore(Settings.EMPTY, mock(Client.class), mock(SecurityIndexManager.class), mock(ScriptService.class))
+        );
+
+        doAnswer(invocation -> {
+            ((ActionListener<Boolean>) invocation.getArgument(1)).onResponse(true);
+            return null;
+        }).when(nativeRoleMappingStore).putRoleMapping(any(), any());
+
+        doAnswer(invocation -> {
+            ((ActionListener<Boolean>) invocation.getArgument(1)).onResponse(true);
+            return null;
+        }).when(nativeRoleMappingStore).deleteRoleMapping(any(), any());
+
+        return nativeRoleMappingStore;
+    }
+}

+ 45 - 0
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportDeleteRoleMappingActionTests.java

@@ -0,0 +1,45 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.security.action.rolemapping;
+
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingRequest;
+import org.elasticsearch.xpack.security.authc.support.mapper.NativeRoleMappingStore;
+
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.mockito.Mockito.mock;
+
+public class TransportDeleteRoleMappingActionTests extends ESTestCase {
+    public void testReservedStateHandler() {
+        var store = mock(NativeRoleMappingStore.class);
+        TransportService transportService = new TransportService(
+            Settings.EMPTY,
+            mock(Transport.class),
+            mock(ThreadPool.class),
+            TransportService.NOOP_TRANSPORT_INTERCEPTOR,
+            x -> null,
+            null,
+            Collections.emptySet()
+        );
+        var action = new TransportDeleteRoleMappingAction(mock(ActionFilters.class), transportService, mock(ClusterService.class), store);
+
+        assertEquals(ReservedRoleMappingAction.NAME, action.reservedStateHandlerName().get());
+
+        var deleteRequest = new DeleteRoleMappingRequest();
+        deleteRequest.setName("kibana_all");
+        assertThat(action.modifiedKeys(deleteRequest), containsInAnyOrder("kibana_all"));
+    }
+}

+ 39 - 2
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/action/rolemapping/TransportPutRoleMappingActionTests.java

@@ -9,12 +9,16 @@ package org.elasticsearch.xpack.security.action.rolemapping;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingRequest;
 import org.elasticsearch.xpack.core.security.action.rolemapping.PutRoleMappingResponse;
 import org.elasticsearch.xpack.core.security.authc.support.mapper.ExpressionRoleMapping;
@@ -29,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.iterableWithSize;
@@ -55,7 +60,7 @@ public class TransportPutRoleMappingActionTests extends ESTestCase {
             null,
             Collections.emptySet()
         );
-        action = new TransportPutRoleMappingAction(mock(ActionFilters.class), transportService, store);
+        action = new TransportPutRoleMappingAction(mock(ActionFilters.class), transportService, mock(ClusterService.class), store);
 
         requestRef = new AtomicReference<>(null);
 
@@ -94,7 +99,39 @@ public class TransportPutRoleMappingActionTests extends ESTestCase {
         request.setMetadata(metadata);
         request.setEnabled(true);
         final PlainActionFuture<PutRoleMappingResponse> future = new PlainActionFuture<>();
-        action.doExecute(mock(Task.class), request, future);
+        action.doExecuteProtected(mock(Task.class), request, future);
         return future.get();
     }
+
+    public void testReservedStateHandler() throws Exception {
+        assertEquals(ReservedRoleMappingAction.NAME, action.reservedStateHandlerName().get());
+        String json = """
+            {
+               "everyone_kibana": {
+                  "enabled": true,
+                  "roles": [ "kibana_user" ],
+                  "rules": { "field": { "username": "*" } },
+                  "metadata": {
+                     "uuid" : "b9a59ba9-6b92-4be2-bb8d-02bb270cb3a7"
+                  }
+               },
+               "everyone_fleet": {
+                  "enabled": true,
+                  "roles": [ "fleet_user" ],
+                  "rules": { "field": { "username": "*" } },
+                  "metadata": {
+                     "uuid" : "b9a59ba9-6b92-4be3-bb8d-02bb270cb3a7"
+                  }
+               }
+            }""";
+
+        try (XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json)) {
+            ReservedRoleMappingAction roleMappingAction = new ReservedRoleMappingAction(store);
+            var parsedResult = roleMappingAction.fromXContent(parser);
+
+            for (var mapping : parsedResult) {
+                assertThat(action.modifiedKeys(PutRoleMappingRequest.fromMapping(mapping)), containsInAnyOrder(mapping.getName()));
+            }
+        }
+    }
 }

+ 8 - 0
x-pack/plugin/security/src/test/resources/META-INF/services/org.elasticsearch.reservedstate.ReservedClusterStateHandlerProvider

@@ -0,0 +1,8 @@
+#
+# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+# or more contributor license agreements. Licensed under the Elastic License
+# 2.0; you may not use this file except in compliance with the Elastic License
+# 2.0.
+#
+
+org.elasticsearch.xpack.security.LocalReservedSecurityStateHandlerProvider