Răsfoiți Sursa

Do not execute ML CRUD actions when upgrade mode is enabled (#54437)

Przemysław Witek 5 ani în urmă
părinte
comite
2c6df68ea7

+ 17 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.ClusterState;
@@ -35,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
 import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
 import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
+import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
 import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
@@ -63,6 +65,7 @@ import static org.elasticsearch.test.XContentTestUtils.convertToMap;
 import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
 import static org.elasticsearch.xpack.security.test.SecurityTestUtils.writeFile;
+import static org.hamcrest.Matchers.is;
 
 /**
  * Base class of ML integration tests that use a native autodetect process
@@ -136,6 +139,7 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
     }
 
     protected void cleanUp() {
+        setUpgradeModeTo(false);
         cleanUpResources();
         waitForPendingTasks();
     }
@@ -154,6 +158,19 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
         }
     }
 
+    protected void setUpgradeModeTo(boolean enabled) {
+        AcknowledgedResponse response =
+            client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(enabled)).actionGet();
+        assertThat(response.isAcknowledged(), is(true));
+        assertThat(upgradeMode(), is(enabled));
+    }
+
+    protected boolean upgradeMode() {
+        ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
+        MlMetadata mlMetadata = MlMetadata.getMlMetadata(masterClusterState);
+        return mlMetadata.isUpgradeMode();
+    }
+
     protected DeleteExpiredDataAction.Response deleteExpiredData() throws Exception {
         DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE,
             new DeleteExpiredDataAction.Request()).get();

+ 77 - 44
x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/SetUpgradeModeIT.java

@@ -6,19 +6,18 @@
 package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
-import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
+import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
+import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
 import org.junit.After;
 
@@ -31,8 +30,10 @@ import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createSched
 import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
 import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDatafeedStats;
 import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
@@ -49,35 +50,31 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
         String datafeedId = jobId + "-datafeed";
         startRealtime(jobId);
 
+        assertThat(upgradeMode(), is(false));
+
         // Assert appropriate task state and assignment numbers
         assertThat(client().admin()
             .cluster()
             .prepareListTasks()
             .setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
             .get()
-            .getTasks()
-            .size(), equalTo(2));
+            .getTasks(), hasSize(2));
 
         ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
 
         PersistentTasksCustomMetadata persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
-        assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
-        assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
-        assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
+        assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
+        assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));
 
         // Set the upgrade mode setting
-        AcknowledgedResponse response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true))
-            .actionGet();
-
-        assertThat(response.isAcknowledged(), equalTo(true));
+        setUpgradeModeTo(true);
 
         masterClusterState = client().admin().cluster().prepareState().all().get().getState();
 
         // Assert state for tasks still exists and that the upgrade setting is set
         persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
-        assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
-        assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
-        assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(true));
+        assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
+        assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));
 
         assertThat(client().admin()
             .cluster()
@@ -87,50 +84,81 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
             .getTasks(), is(empty()));
 
         GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId).get(0);
-        assertThat(jobStats.getState(), equalTo(JobState.OPENED));
-        assertThat(jobStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation()));
+        assertThat(jobStats.getState(), is(equalTo(JobState.OPENED)));
+        assertThat(jobStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
         assertThat(jobStats.getNode(), is(nullValue()));
 
         GetDatafeedsStatsAction.Response.DatafeedStats datafeedStats = getDatafeedStats(datafeedId);
-        assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
-        assertThat(datafeedStats.getAssignmentExplanation(), equalTo(AWAITING_UPGRADE.getExplanation()));
+        assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED)));
+        assertThat(datafeedStats.getAssignmentExplanation(), is(equalTo(AWAITING_UPGRADE.getExplanation())));
         assertThat(datafeedStats.getNode(), is(nullValue()));
 
-        Job.Builder job = createScheduledJob("job-should-not-open");
-        registerJob(job);
-        putJob(job);
-        ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> openJob(job.getId()));
-        assertThat(statusException.status(), equalTo(RestStatus.TOO_MANY_REQUESTS));
-        assertThat(statusException.getMessage(), equalTo("Cannot open jobs when upgrade mode is enabled"));
-
-        //Disable the setting
-        response = client().execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(false))
-            .actionGet();
-
-        assertThat(response.isAcknowledged(), equalTo(true));
+        // Disable the setting
+        setUpgradeModeTo(false);
 
         masterClusterState = client().admin().cluster().prepareState().all().get().getState();
 
         persistentTasks = masterClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
-        assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true).size(), equalTo(1));
-        assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true).size(), equalTo(1));
-        assertThat(MlMetadata.getMlMetadata(masterClusterState).isUpgradeMode(), equalTo(false));
+        assertThat(persistentTasks.findTasks(MlTasks.DATAFEED_TASK_NAME, task -> true), hasSize(1));
+        assertThat(persistentTasks.findTasks(MlTasks.JOB_TASK_NAME, task -> true), hasSize(1));
 
         assertBusy(() -> assertThat(client().admin()
             .cluster()
             .prepareListTasks()
             .setActions(MlTasks.JOB_TASK_NAME + "[c]", MlTasks.DATAFEED_TASK_NAME + "[c]")
             .get()
-            .getTasks()
-            .size(), equalTo(2)));
+            .getTasks(), hasSize(2)));
 
         jobStats = getJobStats(jobId).get(0);
-        assertThat(jobStats.getState(), equalTo(JobState.OPENED));
-        assertThat(jobStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation())));
+        assertThat(jobStats.getState(), is(equalTo(JobState.OPENED)));
+        assertThat(jobStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
 
         datafeedStats = getDatafeedStats(datafeedId);
-        assertThat(datafeedStats.getDatafeedState(), equalTo(DatafeedState.STARTED));
-        assertThat(datafeedStats.getAssignmentExplanation(), not(equalTo(AWAITING_UPGRADE.getExplanation())));
+        assertThat(datafeedStats.getDatafeedState(), is(equalTo(DatafeedState.STARTED)));
+        assertThat(datafeedStats.getAssignmentExplanation(), is(not(equalTo(AWAITING_UPGRADE.getExplanation()))));
+    }
+
+    public void testJobOpenActionInUpgradeMode() {
+        String jobId = "job-should-not-open";
+        Job.Builder job = createScheduledJob(jobId);
+        registerJob(job);
+        putJob(job);
+
+        setUpgradeModeTo(true);
+
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> openJob(jobId));
+        assertThat(e.getMessage(), is(equalTo("Cannot perform cluster:admin/xpack/ml/job/open action while upgrade mode is enabled")));
+        assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
+    }
+
+    public void testAnomalyDetectionActionsInUpgradeMode() {
+        setUpgradeModeTo(true);
+
+        String jobId = "job_id";
+        expectThrowsUpgradeModeException(() -> putJob(createScheduledJob(jobId)));
+        expectThrowsUpgradeModeException(() -> updateJob(jobId, null));
+        expectThrowsUpgradeModeException(() -> deleteJob(jobId));
+        expectThrowsUpgradeModeException(() -> openJob(jobId));
+        expectThrowsUpgradeModeException(() -> flushJob(jobId, false));
+        expectThrowsUpgradeModeException(() -> closeJob(jobId));
+        expectThrowsUpgradeModeException(() -> persistJob(jobId));
+        expectThrowsUpgradeModeException(() -> forecast(jobId, null, null));
+
+        String snapshotId = "snapshot_id";
+        expectThrowsUpgradeModeException(() -> revertModelSnapshot(jobId, snapshotId));
+
+        String datafeedId = "datafeed_id";
+        expectThrowsUpgradeModeException(() -> putDatafeed(createDatafeed(datafeedId, jobId, Collections.singletonList("index"))));
+        expectThrowsUpgradeModeException(() -> updateDatafeed(new DatafeedUpdate.Builder(datafeedId).build()));
+        expectThrowsUpgradeModeException(() -> deleteDatafeed(datafeedId));
+        expectThrowsUpgradeModeException(() -> startDatafeed(datafeedId, 0, null));
+        expectThrowsUpgradeModeException(() -> stopDatafeed(datafeedId));
+
+        String filterId = "filter_id";
+        expectThrowsUpgradeModeException(() -> putMlFilter(MlFilter.builder(filterId).build()));
+
+        String calendarId = "calendar_id";
+        expectThrowsUpgradeModeException(() -> putCalendar(calendarId, Collections.singletonList(jobId), ""));
     }
 
     private void startRealtime(String jobId) throws Exception {
@@ -154,8 +182,8 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
         startDatafeed(datafeedConfig.getId(), 0L, null);
         assertBusy(() -> {
             DataCounts dataCounts = getDataCounts(job.getId());
-            assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
-            assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
+            assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1)));
+            assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L)));
         });
 
         long numDocs2 = randomIntBetween(2, 64);
@@ -163,9 +191,14 @@ public class SetUpgradeModeIT extends MlNativeAutodetectIntegTestCase {
         indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
         assertBusy(() -> {
             DataCounts dataCounts = getDataCounts(job.getId());
-            assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));
-            assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
+            assertThat(dataCounts.getProcessedRecordCount(), is(equalTo(numDocs1 + numDocs2)));
+            assertThat(dataCounts.getOutOfOrderTimeStampCount(), is(equalTo(0L)));
         }, 30, TimeUnit.SECONDS);
     }
 
+    private static void expectThrowsUpgradeModeException(ThrowingRunnable actionInvocation) {
+        ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, actionInvocation);
+        assertThat(e.getMessage(), containsString("upgrade mode is enabled"));
+        assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
+    }
 }

+ 15 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -11,6 +11,7 @@ import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.ActionFilter;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.OriginSettingClient;
 import org.elasticsearch.client.node.NodeClient;
@@ -332,6 +333,7 @@ import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
 
 import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 
 public class MachineLearning extends Plugin implements SystemIndexPlugin, AnalysisPlugin, IngestPlugin, PersistentTaskPlugin {
     public static final String NAME = "ml";
@@ -428,6 +430,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
     private final SetOnce<DataFrameAnalyticsManager> dataFrameAnalyticsManager = new SetOnce<>();
     private final SetOnce<DataFrameAnalyticsAuditor> dataFrameAnalyticsAuditor = new SetOnce<>();
     private final SetOnce<MlMemoryTracker> memoryTracker = new SetOnce<>();
+    private final SetOnce<ActionFilter> mlUpgradeModeActionFilter = new SetOnce<>();
 
     public MachineLearning(Settings settings, Path configPath) {
         this.settings = settings;
@@ -530,9 +533,11 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
                                                IndexNameExpressionResolver indexNameExpressionResolver) {
         if (enabled == false) {
             // special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled
-            return Collections.singletonList(new JobManagerHolder());
+            return singletonList(new JobManagerHolder());
         }
 
+        this.mlUpgradeModeActionFilter.set(new MlUpgradeModeActionFilter(clusterService));
+
         new MlIndexTemplateRegistry(settings, clusterService, threadPool, client, xContentRegistry);
 
         AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
@@ -881,6 +886,15 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
                 infoAction);
     }
 
+    @Override
+    public List<ActionFilter> getActionFilters() {
+        if (enabled == false) {
+            return emptyList();
+        }
+
+        return singletonList(this.mlUpgradeModeActionFilter.get());
+    }
+
     @Override
     public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
         if (false == enabled) {

+ 148 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilter.java

@@ -0,0 +1,148 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.support.ActionFilter;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.xpack.core.ml.MlMetadata;
+import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteFilterAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteForecastAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteModelSnapshotAction;
+import org.elasticsearch.xpack.core.ml.action.DeleteTrainedModelAction;
+import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
+import org.elasticsearch.xpack.core.ml.action.FlushJobAction;
+import org.elasticsearch.xpack.core.ml.action.ForecastJobAction;
+import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
+import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
+import org.elasticsearch.xpack.core.ml.action.PersistJobAction;
+import org.elasticsearch.xpack.core.ml.action.PostCalendarEventsAction;
+import org.elasticsearch.xpack.core.ml.action.PostDataAction;
+import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
+import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
+import org.elasticsearch.xpack.core.ml.action.PutJobAction;
+import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction;
+import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
+import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateCalendarJobAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateModelSnapshotAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateProcessAction;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * {@link MlUpgradeModeActionFilter} disallows certain actions if the cluster is currently in upgrade mode.
+ *
+ * Disallowed actions are the ones which can access/alter the state of ML internal indices.
+ */
+class MlUpgradeModeActionFilter extends ActionFilter.Simple {
+
+    private static final Set<String> ACTIONS_DISALLOWED_IN_UPGRADE_MODE =
+        Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            PutJobAction.NAME,
+            UpdateJobAction.NAME,
+            DeleteJobAction.NAME,
+            OpenJobAction.NAME,
+            FlushJobAction.NAME,
+            CloseJobAction.NAME,
+            PersistJobAction.NAME,
+
+            FinalizeJobExecutionAction.NAME,
+            PostDataAction.NAME,
+
+            RevertModelSnapshotAction.NAME,
+            UpdateModelSnapshotAction.NAME,
+            DeleteModelSnapshotAction.NAME,
+
+            PutDatafeedAction.NAME,
+            UpdateDatafeedAction.NAME,
+            DeleteDatafeedAction.NAME,
+            StartDatafeedAction.NAME,
+            StopDatafeedAction.NAME,
+
+            PutFilterAction.NAME,
+            UpdateFilterAction.NAME,
+            DeleteFilterAction.NAME,
+
+            PutCalendarAction.NAME,
+            UpdateCalendarJobAction.NAME,
+            PostCalendarEventsAction.NAME,
+            DeleteCalendarAction.NAME,
+            DeleteCalendarEventAction.NAME,
+
+            UpdateProcessAction.NAME,
+            KillProcessAction.NAME,
+
+            DeleteExpiredDataAction.NAME,
+
+            ForecastJobAction.NAME,
+            DeleteForecastAction.NAME,
+
+            PutDataFrameAnalyticsAction.NAME,
+            DeleteDataFrameAnalyticsAction.NAME,
+            StartDataFrameAnalyticsAction.NAME,
+            StopDataFrameAnalyticsAction.NAME,
+
+            PutTrainedModelAction.NAME,
+            DeleteTrainedModelAction.NAME
+        )));
+
+    private final AtomicBoolean isUpgradeMode = new AtomicBoolean();
+
+    MlUpgradeModeActionFilter(ClusterService clusterService) {
+        Objects.requireNonNull(clusterService);
+        clusterService.addListener(this::setIsUpgradeMode);
+    }
+
+    @Override
+    protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
+        if (isUpgradeMode.get() && ACTIONS_DISALLOWED_IN_UPGRADE_MODE.contains(action)) {
+            throw new ElasticsearchStatusException(
+                "Cannot perform {} action while upgrade mode is enabled", RestStatus.TOO_MANY_REQUESTS, action);
+        }
+        return true;
+    }
+
+    /**
+     * To prevent leaking information to unauthorized users, it is extremely important that this filter is executed *after* the
+     * {@code SecurityActionFilter}.
+     * To achieve that, the number returned by this method must be greater than the number returned by the
+     * {@code SecurityActionFilter::order} method.
+     */
+    @Override
+    public int order() {
+        return Integer.MAX_VALUE;
+    }
+
+    // Visible for testing
+    void setIsUpgradeMode(ClusterChangedEvent event) {
+        isUpgradeMode.set(MlMetadata.getMlMetadata(event.state()).isUpgradeMode());
+    }
+}

+ 115 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlUpgradeModeActionFilterTests.java

@@ -0,0 +1,115 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.ml;
+
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.support.ActionFilter;
+import org.elasticsearch.action.support.ActionFilterChain;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ml.MlMetadata;
+import org.elasticsearch.xpack.core.ml.action.PutJobAction;
+import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
+import org.elasticsearch.xpack.security.action.filter.SecurityActionFilter;
+import org.junit.After;
+import org.junit.Before;
+
+import static org.hamcrest.Matchers.arrayContaining;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+public class MlUpgradeModeActionFilterTests extends ESTestCase {
+
+    private static final String DISALLOWED_ACTION = PutJobAction.NAME;
+    private static final String ALLOWED_ACTION = SetUpgradeModeAction.NAME;
+
+    private ClusterService clusterService;
+    private Task task;
+    private ActionRequest request;
+    private ActionListener<ActionResponse> listener;
+    private ActionFilterChain<ActionRequest, ActionResponse> chain;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setUpMocks() {
+        clusterService = mock(ClusterService.class);
+        task = mock(Task.class);
+        request = mock(ActionRequest.class);
+        listener = mock(ActionListener.class);
+        chain = mock(ActionFilterChain.class);
+    }
+
+    @After
+    public void assertNoMoreInteractions() {
+        verifyNoMoreInteractions(task, request, listener, chain);
+    }
+
+    public void testApply_ActionDisallowedInUpgradeMode() {
+        MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService);
+        filter.apply(task, DISALLOWED_ACTION, request, listener, chain);
+
+        filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true)));
+        ElasticsearchStatusException e =
+            expectThrows(
+                ElasticsearchStatusException.class,
+                () -> filter.apply(task, DISALLOWED_ACTION, request, listener, chain));
+
+        filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false)));
+        filter.apply(task, DISALLOWED_ACTION, request, listener, chain);
+
+        assertThat(e.getMessage(), is(equalTo("Cannot perform " + DISALLOWED_ACTION + " action while upgrade mode is enabled")));
+        assertThat(e.status(), is(equalTo(RestStatus.TOO_MANY_REQUESTS)));
+
+        verify(chain, times(2)).proceed(task, DISALLOWED_ACTION, request, listener);
+    }
+
+    public void testApply_ActionAllowedInUpgradeMode() {
+        MlUpgradeModeActionFilter filter = new MlUpgradeModeActionFilter(clusterService);
+        filter.apply(task, ALLOWED_ACTION, request, listener, chain);
+
+        filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(true)));
+        filter.apply(task, ALLOWED_ACTION, request, listener, chain);
+
+        filter.setIsUpgradeMode(createClusterChangedEvent(createClusterState(false)));
+        filter.apply(task, ALLOWED_ACTION, request, listener, chain);
+
+        verify(chain, times(3)).proceed(task, ALLOWED_ACTION, request, listener);
+    }
+
+    public void testOrder_UpgradeFilterIsExecutedAfterSecurityFilter() {
+        MlUpgradeModeActionFilter upgradeModeFilter = new MlUpgradeModeActionFilter(clusterService);
+        SecurityActionFilter securityFilter = new SecurityActionFilter(null, null, null, mock(ThreadPool.class), null, null);
+
+        ActionFilter[] actionFiltersInOrderOfExecution = new ActionFilters(Sets.newHashSet(upgradeModeFilter, securityFilter)).filters();
+        assertThat(actionFiltersInOrderOfExecution, is(arrayContaining(securityFilter, upgradeModeFilter)));
+    }
+
+    private static ClusterChangedEvent createClusterChangedEvent(ClusterState clusterState) {
+        return new ClusterChangedEvent("created-from-test", clusterState, clusterState);
+    }
+
+    private static ClusterState createClusterState(boolean isUpgradeMode) {
+        return ClusterState.builder(new ClusterName("MlUpgradeModeActionFilterTests"))
+            .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(isUpgradeMode).build()))
+            .build();
+    }
+}

+ 6 - 6
x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml

@@ -220,11 +220,6 @@ teardown:
 
 ---
 "Attempt to open job when upgrade_mode is enabled":
-  - do:
-      ml.set_upgrade_mode:
-        enabled: true
-  - match: { acknowledged: true }
-
   - do:
       ml.put_job:
         job_id: failing-set-upgrade-mode-job
@@ -243,6 +238,11 @@ teardown:
           }
 
   - do:
-      catch: /Cannot open jobs when upgrade mode is enabled/
+      ml.set_upgrade_mode:
+        enabled: true
+  - match: { acknowledged: true }
+
+  - do:
+      catch: /Cannot perform cluster:admin/xpack/ml/job/open action while upgrade mode is enabled/
       ml.open_job:
         job_id: failing-set-upgrade-mode-job