Selaa lähdekoodia

Remove oldState from start-license tasks (#85993)

Relying on the original cluster state during publication completion
handling is incompatible with proper batching. The `LicenseService` uses
the old state to work out how to respond to the client. With this commit
we capture the proper response during task execution itself, removing
the reliance on the original cluster state in the completion handler.
David Turner 3 vuotta sitten
vanhempi
commit
a2f7a8239e

+ 23 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java

@@ -15,9 +15,11 @@ import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.hash.MessageDigests;
@@ -134,6 +136,9 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
      */
     private final List<License.LicenseType> allowedLicenseTypes;
 
+    private final StartTrialClusterTask.Executor startTrialExecutor = new StartTrialClusterTask.Executor();
+    private final StartBasicClusterTask.Executor startBasicExecutor = new StartBasicClusterTask.Executor();
+
     /**
      * Max number of nodes licensed by generated trial license
      */
@@ -398,7 +403,12 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
             "delete license",
             listener
         );
-        submitUnbatchedTask(task.getDescription(), task);
+        clusterService.submitStateUpdateTask(
+            task.getDescription(),
+            task,
+            ClusterStateTaskConfig.build(Priority.NORMAL), // TODO should pass in request.masterNodeTimeout() here
+            startBasicExecutor
+        );
     }
 
     public License getLicense() {
@@ -421,8 +431,12 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
                     + "]"
             );
         }
-        StartTrialClusterTask task = new StartTrialClusterTask(logger, clusterService.getClusterName().value(), clock, request, listener);
-        submitUnbatchedTask(StartTrialClusterTask.TASK_SOURCE, task);
+        clusterService.submitStateUpdateTask(
+            StartTrialClusterTask.TASK_SOURCE,
+            new StartTrialClusterTask(logger, clusterService.getClusterName().value(), clock, request, listener),
+            ClusterStateTaskConfig.build(Priority.NORMAL), // TODO should pass in request.masterNodeTimeout() here
+            startTrialExecutor
+        );
     }
 
     void startBasicLicense(PostStartBasicRequest request, final ActionListener<PostStartBasicResponse> listener) {
@@ -434,7 +448,12 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
             "start basic license",
             listener
         );
-        submitUnbatchedTask(task.getDescription(), task);
+        clusterService.submitStateUpdateTask(
+            task.getDescription(),
+            task,
+            ClusterStateTaskConfig.build(Priority.NORMAL), // TODO should pass in request.masterNodeTimeout() here
+            startBasicExecutor
+        );
     }
 
     /**

+ 58 - 32
x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java

@@ -11,18 +11,19 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.core.XPackPlugin;
 
 import java.time.Clock;
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
 
-public class StartBasicClusterTask extends ClusterStateUpdateTask {
+public class StartBasicClusterTask implements ClusterStateTaskListener {
 
     private static final String ACKNOWLEDGEMENT_HEADER = "This license update requires acknowledgement. To acknowledge the license, "
         + "please read the following messages and call /start_basic again, this time with the \"acknowledge=true\" parameter:";
@@ -33,7 +34,6 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
     private final String description;
     private final ActionListener<PostStartBasicResponse> listener;
     private final Clock clock;
-    private AtomicReference<Map<String, String[]>> ackMessages = new AtomicReference<>(Collections.emptyMap());
 
     StartBasicClusterTask(
         Logger logger,
@@ -53,43 +53,50 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
 
     @Override
     public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-        LicensesMetadata oldLicensesMetadata = oldState.metadata().custom(LicensesMetadata.TYPE);
-        logger.debug("license prior to starting basic license: {}", oldLicensesMetadata);
-        License oldLicense = LicensesMetadata.extractLicense(oldLicensesMetadata);
-        Map<String, String[]> acknowledgeMessages = ackMessages.get();
-        if (acknowledgeMessages.isEmpty() == false) {
-            listener.onResponse(
-                new PostStartBasicResponse(PostStartBasicResponse.Status.NEED_ACKNOWLEDGEMENT, acknowledgeMessages, ACKNOWLEDGEMENT_HEADER)
-            );
-        } else if (oldLicense != null && License.LicenseType.isBasic(oldLicense.type())) {
-            listener.onResponse(new PostStartBasicResponse(PostStartBasicResponse.Status.ALREADY_USING_BASIC));
-        } else {
-            listener.onResponse(new PostStartBasicResponse(PostStartBasicResponse.Status.GENERATED_BASIC));
-        }
+        assert false : "never called";
     }
 
-    @Override
-    public ClusterState execute(ClusterState currentState) throws Exception {
-        XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
-        LicensesMetadata currentLicensesMetadata = currentState.metadata().custom(LicensesMetadata.TYPE);
+    public LicensesMetadata execute(
+        LicensesMetadata currentLicensesMetadata,
+        DiscoveryNodes discoveryNodes,
+        ClusterStateTaskExecutor.TaskContext<StartBasicClusterTask> taskContext
+    ) throws Exception {
+        assert taskContext.getTask() == this;
+        final var listener = ActionListener.runBefore(
+            this.listener,
+            () -> logger.debug("license prior to starting basic license: {}", currentLicensesMetadata)
+        );
         License currentLicense = LicensesMetadata.extractLicense(currentLicensesMetadata);
+        final LicensesMetadata updatedLicensesMetadata;
         if (shouldGenerateNewBasicLicense(currentLicense)) {
-            License selfGeneratedLicense = generateBasicLicense(currentState);
+            License selfGeneratedLicense = generateBasicLicense(discoveryNodes);
             if (request.isAcknowledged() == false && currentLicense != null) {
                 Map<String, String[]> ackMessageMap = LicenseService.getAckMessages(selfGeneratedLicense, currentLicense);
                 if (ackMessageMap.isEmpty() == false) {
-                    this.ackMessages.set(ackMessageMap);
-                    return currentState;
+                    taskContext.success(
+                        listener.delegateFailure(
+                            (delegate, ignored) -> delegate.onResponse(
+                                new PostStartBasicResponse(
+                                    PostStartBasicResponse.Status.NEED_ACKNOWLEDGEMENT,
+                                    ackMessageMap,
+                                    ACKNOWLEDGEMENT_HEADER
+                                )
+                            )
+                        )
+                    );
+                    return currentLicensesMetadata;
                 }
             }
             Version trialVersion = currentLicensesMetadata != null ? currentLicensesMetadata.getMostRecentTrialVersion() : null;
-            LicensesMetadata newLicensesMetadata = new LicensesMetadata(selfGeneratedLicense, trialVersion);
-            Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
-            mdBuilder.putCustom(LicensesMetadata.TYPE, newLicensesMetadata);
-            return ClusterState.builder(currentState).metadata(mdBuilder).build();
+            updatedLicensesMetadata = new LicensesMetadata(selfGeneratedLicense, trialVersion);
         } else {
-            return currentState;
+            updatedLicensesMetadata = currentLicensesMetadata;
         }
+        final var responseStatus = currentLicense != null && License.LicenseType.isBasic(currentLicense.type())
+            ? PostStartBasicResponse.Status.ALREADY_USING_BASIC
+            : PostStartBasicResponse.Status.GENERATED_BASIC;
+        taskContext.success(listener.delegateFailure((l, s) -> l.onResponse(new PostStartBasicResponse(responseStatus))));
+        return updatedLicensesMetadata;
     }
 
     @Override
@@ -105,7 +112,7 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
             || LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS != LicenseService.getExpiryDate(currentLicense);
     }
 
-    private License generateBasicLicense(ClusterState currentState) {
+    private License generateBasicLicense(DiscoveryNodes discoveryNodes) {
         final License.Builder specBuilder = License.builder()
             .uid(UUID.randomUUID().toString())
             .issuedTo(clusterName)
@@ -114,10 +121,29 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
             .type(License.LicenseType.BASIC)
             .expiryDate(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS);
 
-        return SelfGeneratedLicense.create(specBuilder, currentState.nodes());
+        return SelfGeneratedLicense.create(specBuilder, discoveryNodes);
     }
 
     public String getDescription() {
         return description;
     }
+
+    static class Executor implements ClusterStateTaskExecutor<StartBasicClusterTask> {
+        @Override
+        public ClusterState execute(ClusterState currentState, List<TaskContext<StartBasicClusterTask>> taskContexts) throws Exception {
+            XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
+            final LicensesMetadata originalLicensesMetadata = currentState.metadata().custom(LicensesMetadata.TYPE);
+            var currentLicensesMetadata = originalLicensesMetadata;
+            for (final var taskContext : taskContexts) {
+                currentLicensesMetadata = taskContext.getTask().execute(currentLicensesMetadata, currentState.nodes(), taskContext);
+            }
+            if (currentLicensesMetadata == originalLicensesMetadata) {
+                return currentState;
+            } else {
+                return ClusterState.builder(currentState)
+                    .metadata(Metadata.builder(currentState.metadata()).putCustom(LicensesMetadata.TYPE, currentLicensesMetadata))
+                    .build();
+            }
+        }
+    }
 }

+ 58 - 25
x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java

@@ -10,17 +10,20 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.xpack.core.XPackPlugin;
 
 import java.time.Clock;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-public class StartTrialClusterTask extends ClusterStateUpdateTask {
+public class StartTrialClusterTask implements ClusterStateTaskListener {
 
     private static final String ACKNOWLEDGEMENT_HEADER = "This API initiates a free 30-day trial for all platinum features. "
         + "By starting this trial, you agree that it is subject to the terms and conditions at"
@@ -56,30 +59,30 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask {
 
     @Override
     public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-        LicensesMetadata oldLicensesMetadata = oldState.metadata().custom(LicensesMetadata.TYPE);
-        logger.debug("started self generated trial license: {}", oldLicensesMetadata);
-
-        if (request.isAcknowledged() == false) {
-            listener.onResponse(
-                new PostStartTrialResponse(PostStartTrialResponse.Status.NEED_ACKNOWLEDGEMENT, ACK_MESSAGES, ACKNOWLEDGEMENT_HEADER)
-            );
-        } else if (oldLicensesMetadata == null || oldLicensesMetadata.isEligibleForTrial()) {
-            listener.onResponse(new PostStartTrialResponse(PostStartTrialResponse.Status.UPGRADED_TO_TRIAL));
-        } else {
-            listener.onResponse(new PostStartTrialResponse(PostStartTrialResponse.Status.TRIAL_ALREADY_ACTIVATED));
-        }
+        assert false : "never called";
     }
 
-    @Override
-    public ClusterState execute(ClusterState currentState) throws Exception {
-        XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
-        LicensesMetadata currentLicensesMetadata = currentState.metadata().custom(LicensesMetadata.TYPE);
-
+    private LicensesMetadata execute(
+        LicensesMetadata currentLicensesMetadata,
+        DiscoveryNodes discoveryNodes,
+        ClusterStateTaskExecutor.TaskContext<StartTrialClusterTask> taskContext
+    ) {
+        assert taskContext.getTask() == this;
+        final var listener = ActionListener.runBefore(
+            this.listener,
+            () -> { logger.debug("started self generated trial license: {}", currentLicensesMetadata); }
+        );
         if (request.isAcknowledged() == false) {
-            return currentState;
+            taskContext.success(
+                listener.delegateFailure(
+                    (l, s) -> l.onResponse(
+                        new PostStartTrialResponse(PostStartTrialResponse.Status.NEED_ACKNOWLEDGEMENT, ACK_MESSAGES, ACKNOWLEDGEMENT_HEADER)
+                    )
+                )
+            );
+            return currentLicensesMetadata;
         } else if (currentLicensesMetadata == null || currentLicensesMetadata.isEligibleForTrial()) {
             long issueDate = clock.millis();
-            Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
             long expiryDate = issueDate + LicenseService.NON_BASIC_SELF_GENERATED_LICENSE_DURATION.getMillis();
 
             License.Builder specBuilder = License.builder()
@@ -93,12 +96,21 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask {
             } else {
                 specBuilder.maxNodes(LicenseService.SELF_GENERATED_LICENSE_MAX_NODES);
             }
-            License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
+            License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, discoveryNodes);
             LicensesMetadata newLicensesMetadata = new LicensesMetadata(selfGeneratedLicense, Version.CURRENT);
-            mdBuilder.putCustom(LicensesMetadata.TYPE, newLicensesMetadata);
-            return ClusterState.builder(currentState).metadata(mdBuilder).build();
+            taskContext.success(
+                listener.delegateFailure(
+                    (delegate, ignored) -> delegate.onResponse(new PostStartTrialResponse(PostStartTrialResponse.Status.UPGRADED_TO_TRIAL))
+                )
+            );
+            return newLicensesMetadata;
         } else {
-            return currentState;
+            taskContext.success(
+                listener.delegateFailure(
+                    (l, s) -> l.onResponse(new PostStartTrialResponse(PostStartTrialResponse.Status.TRIAL_ALREADY_ACTIVATED))
+                )
+            );
+            return currentLicensesMetadata;
         }
     }
 
@@ -107,4 +119,25 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask {
         logger.error("unexpected failure during [" + TASK_SOURCE + "]", e);
         listener.onFailure(e);
     }
+
+    static class Executor implements ClusterStateTaskExecutor<StartTrialClusterTask> {
+
+        @Override
+        public ClusterState execute(ClusterState currentState, List<TaskContext<StartTrialClusterTask>> taskContexts) throws Exception {
+            XPackPlugin.checkReadyForXPackCustomMetadata(currentState);
+            final LicensesMetadata originalLicensesMetadata = currentState.metadata().custom(LicensesMetadata.TYPE);
+            var currentLicensesMetadata = originalLicensesMetadata;
+            for (final var taskContext : taskContexts) {
+                currentLicensesMetadata = taskContext.getTask().execute(currentLicensesMetadata, currentState.nodes(), taskContext);
+            }
+            if (currentLicensesMetadata == originalLicensesMetadata) {
+                return currentState;
+            } else {
+                return ClusterState.builder(currentState)
+                    .metadata(Metadata.builder(currentState.metadata()).putCustom(LicensesMetadata.TYPE, currentLicensesMetadata))
+                    .build();
+            }
+        }
+    }
+
 }