瀏覽代碼

Extract transport cluster settings/ilm execute logic (#86941)

Extract execute logic from the transport actions for cluster
update settings and ILM put/delete to support future reuse for
operator file based updates.

Relates to #86224
Nikola Grcevski 3 年之前
父節點
當前提交
fbf335dcf1

+ 33 - 17
server/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

@@ -13,6 +13,7 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterState;
@@ -137,11 +138,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
         final ClusterState state,
         final ClusterState state,
         final ActionListener<ClusterUpdateSettingsResponse> listener
         final ActionListener<ClusterUpdateSettingsResponse> listener
     ) {
     ) {
-        final SettingsUpdater updater = new SettingsUpdater(clusterSettings);
-        submitUnbatchedTask(UPDATE_TASK_SOURCE, new AckedClusterStateUpdateTask(Priority.IMMEDIATE, request, listener) {
-
-            private volatile boolean changed = false;
-
+        submitUnbatchedTask(UPDATE_TASK_SOURCE, new ClusterUpdateSettingsTask(clusterSettings, Priority.IMMEDIATE, request, listener) {
             @Override
             @Override
             protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
             protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) {
                 return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
                 return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate());
@@ -225,21 +222,40 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeAct
                 logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e);
                 logger.debug(() -> "failed to perform [" + UPDATE_TASK_SOURCE + "]", e);
                 super.onFailure(e);
                 super.onFailure(e);
             }
             }
-
-            @Override
-            public ClusterState execute(final ClusterState currentState) {
-                final ClusterState clusterState = updater.updateSettings(
-                    currentState,
-                    clusterSettings.upgradeSettings(request.transientSettings()),
-                    clusterSettings.upgradeSettings(request.persistentSettings()),
-                    logger
-                );
-                changed = clusterState != currentState;
-                return clusterState;
-            }
         });
         });
     }
     }
 
 
+    public static class ClusterUpdateSettingsTask extends AckedClusterStateUpdateTask {
+        protected volatile boolean changed = false;
+        protected final SettingsUpdater updater;
+        protected final ClusterUpdateSettingsRequest request;
+        private final ClusterSettings clusterSettings;
+
+        public ClusterUpdateSettingsTask(
+            final ClusterSettings clusterSettings,
+            Priority priority,
+            ClusterUpdateSettingsRequest request,
+            ActionListener<? extends AcknowledgedResponse> listener
+        ) {
+            super(priority, request, listener);
+            this.clusterSettings = clusterSettings;
+            this.updater = new SettingsUpdater(clusterSettings);
+            this.request = request;
+        }
+
+        @Override
+        public ClusterState execute(final ClusterState currentState) {
+            final ClusterState clusterState = updater.updateSettings(
+                currentState,
+                clusterSettings.upgradeSettings(request.transientSettings()),
+                clusterSettings.upgradeSettings(request.persistentSettings()),
+                logger
+            );
+            changed = clusterState != currentState;
+            return clusterState;
+        }
+    }
+
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
     private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
         clusterService.submitUnbatchedStateUpdateTask(source, task);
         clusterService.submitUnbatchedStateUpdateTask(source, task);

+ 36 - 27
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportDeleteLifecycleAction.java

@@ -59,34 +59,43 @@ public class TransportDeleteLifecycleAction extends TransportMasterNodeAction<Re
 
 
     @Override
     @Override
     protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
     protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
-        submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new AckedClusterStateUpdateTask(request, listener) {
-            @Override
-            public ClusterState execute(ClusterState currentState) {
-                String policyToDelete = request.getPolicyName();
-                List<String> indicesUsingPolicy = currentState.metadata()
-                    .indices()
-                    .values()
-                    .stream()
-                    .filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
-                    .map(idxMeta -> idxMeta.getIndex().getName())
-                    .toList();
-                if (indicesUsingPolicy.isEmpty() == false) {
-                    throw new IllegalArgumentException(
-                        "Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
-                    );
-                }
-                ClusterState.Builder newState = ClusterState.builder(currentState);
-                IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
-                if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
-                    throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
-                }
-                SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
-                newPolicies.remove(request.getPolicyName());
-                IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
-                newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
-                return newState.build();
+        submitUnbatchedTask("delete-lifecycle-" + request.getPolicyName(), new DeleteLifecyclePolicyTask(request, listener));
+    }
+
+    public static class DeleteLifecyclePolicyTask extends AckedClusterStateUpdateTask {
+        private final Request request;
+
+        public DeleteLifecyclePolicyTask(Request request, ActionListener<AcknowledgedResponse> listener) {
+            super(request, listener);
+            this.request = request;
+        }
+
+        @Override
+        public ClusterState execute(ClusterState currentState) {
+            String policyToDelete = request.getPolicyName();
+            List<String> indicesUsingPolicy = currentState.metadata()
+                .indices()
+                .values()
+                .stream()
+                .filter(idxMeta -> policyToDelete.equals(idxMeta.getLifecyclePolicyName()))
+                .map(idxMeta -> idxMeta.getIndex().getName())
+                .toList();
+            if (indicesUsingPolicy.isEmpty() == false) {
+                throw new IllegalArgumentException(
+                    "Cannot delete policy [" + request.getPolicyName() + "]. It is in use by one or more indices: " + indicesUsingPolicy
+                );
+            }
+            ClusterState.Builder newState = ClusterState.builder(currentState);
+            IndexLifecycleMetadata currentMetadata = currentState.metadata().custom(IndexLifecycleMetadata.TYPE);
+            if (currentMetadata == null || currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
+                throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
             }
             }
-        });
+            SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
+            newPolicies.remove(request.getPolicyName());
+            IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
+            newState.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
+            return newState.build();
+        }
     }
     }
 
 
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here

+ 74 - 50
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportPutLifecycleAction.java

@@ -109,61 +109,85 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<Reque
             }
             }
         }
         }
 
 
-        submitUnbatchedTask("put-lifecycle-" + request.getPolicy().getName(), new AckedClusterStateUpdateTask(request, listener) {
-            @Override
-            public ClusterState execute(ClusterState currentState) throws Exception {
-                final IndexLifecycleMetadata currentMetadata = currentState.metadata()
-                    .custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
-                final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas()
-                    .get(request.getPolicy().getName());
+        submitUnbatchedTask(
+            "put-lifecycle-" + request.getPolicy().getName(),
+            new UpdateLifecyclePolicyTask(request, listener, licenseState, filteredHeaders, xContentRegistry, client)
+        );
+    }
 
 
-                // Double-check for no-op in the state update task, in case it was changed/reset in the meantime
-                if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
-                    return currentState;
-                }
+    public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask {
+        private final Request request;
+        private final XPackLicenseState licenseState;
+        private final Map<String, String> filteredHeaders;
+        private final NamedXContentRegistry xContentRegistry;
+        private final Client client;
 
 
-                validatePrerequisites(request.getPolicy(), currentState);
+        public UpdateLifecyclePolicyTask(
+            Request request,
+            ActionListener<AcknowledgedResponse> listener,
+            XPackLicenseState licenseState,
+            Map<String, String> filteredHeaders,
+            NamedXContentRegistry xContentRegistry,
+            Client client
+        ) {
+            super(request, listener);
+            this.request = request;
+            this.licenseState = licenseState;
+            this.filteredHeaders = filteredHeaders;
+            this.xContentRegistry = xContentRegistry;
+            this.client = client;
+        }
 
 
-                ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
-                long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
-                SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
-                LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
-                    request.getPolicy(),
-                    filteredHeaders,
-                    nextVersion,
-                    Instant.now().toEpochMilli()
-                );
-                LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
-                if (oldPolicy == null) {
-                    logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
-                } else {
-                    logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
-                }
-                IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
-                stateBuilder.metadata(
-                    Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build()
-                );
-                ClusterState nonRefreshedState = stateBuilder.build();
-                if (oldPolicy == null) {
+        @Override
+        public ClusterState execute(ClusterState currentState) throws Exception {
+            final IndexLifecycleMetadata currentMetadata = currentState.metadata()
+                .custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
+            final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas().get(request.getPolicy().getName());
+
+            // Double-check for no-op in the state update task, in case it was changed/reset in the meantime
+            if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
+                return currentState;
+            }
+
+            validatePrerequisites(request.getPolicy(), currentState, licenseState);
+
+            ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
+            long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
+            SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
+            LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
+                request.getPolicy(),
+                filteredHeaders,
+                nextVersion,
+                Instant.now().toEpochMilli()
+            );
+            LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
+            if (oldPolicy == null) {
+                logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
+            } else {
+                logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
+            }
+            IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
+            stateBuilder.metadata(Metadata.builder(currentState.getMetadata()).putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
+            ClusterState nonRefreshedState = stateBuilder.build();
+            if (oldPolicy == null) {
+                return nonRefreshedState;
+            } else {
+                try {
+                    return updateIndicesForPolicy(
+                        nonRefreshedState,
+                        xContentRegistry,
+                        client,
+                        oldPolicy.getPolicy(),
+                        lifecyclePolicyMetadata,
+                        licenseState
+                    );
+                } catch (Exception e) {
+                    logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
+                    // Revert to the non-refreshed state
                     return nonRefreshedState;
                     return nonRefreshedState;
-                } else {
-                    try {
-                        return updateIndicesForPolicy(
-                            nonRefreshedState,
-                            xContentRegistry,
-                            client,
-                            oldPolicy.getPolicy(),
-                            lifecyclePolicyMetadata,
-                            licenseState
-                        );
-                    } catch (Exception e) {
-                        logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
-                        // Revert to the non-refreshed state
-                        return nonRefreshedState;
-                    }
                 }
                 }
             }
             }
-        });
+        }
     }
     }
 
 
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
     @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
@@ -193,7 +217,7 @@ public class TransportPutLifecycleAction extends TransportMasterNodeAction<Reque
      * @param policy The lifecycle policy
      * @param policy The lifecycle policy
      * @param state The cluster state
      * @param state The cluster state
      */
      */
-    private void validatePrerequisites(LifecyclePolicy policy, ClusterState state) {
+    private static void validatePrerequisites(LifecyclePolicy policy, ClusterState state, XPackLicenseState licenseState) {
         List<Phase> phasesWithSearchableSnapshotActions = policy.getPhases()
         List<Phase> phasesWithSearchableSnapshotActions = policy.getPhases()
             .values()
             .values()
             .stream()
             .stream()