瀏覽代碼

[ML] Add ML upgrade hooks for system indices upgrade (#79168)

Followup to #78542

This change adds the pre and post system index upgrade
hooks for the ML plugin. It's doing for ML what #78542
did for Watcher.

It's not yet possible to test system index upgrade
end-to-end as the complete framework doesn't exist yet.
David Roberts 4 年之前
父節點
當前提交
9853dddcbb

+ 41 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -1403,11 +1403,11 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
                 .setVersionMetaKey("version")
                 .setOrigin(ML_ORIGIN)
                 .build(),
-            getInferenceIndexSecurityDescriptor()
+            getInferenceIndexSystemIndexDescriptor()
         );
     }
 
-    public static SystemIndexDescriptor getInferenceIndexSecurityDescriptor() {
+    public static SystemIndexDescriptor getInferenceIndexSystemIndexDescriptor() {
         return SystemIndexDescriptor.builder()
             .setIndexPattern(InferenceIndexConstants.INDEX_PATTERN)
             .setPrimaryIndex(InferenceIndexConstants.LATEST_INDEX_NAME)
@@ -1419,11 +1419,49 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
             .build();
     }
 
+    @Override
+    public void prepareForIndicesMigration(ClusterService clusterService, Client client, ActionListener<Map<String, Object>> listener) {
+        boolean isAlreadyInUpgradeMode = MlMetadata.getMlMetadata(clusterService.state()).isUpgradeMode();
+        if (isAlreadyInUpgradeMode) {
+            // ML is already in upgrade mode, so nothing will write to the ML system indices during their upgrade
+            listener.onResponse(Collections.singletonMap("already_in_upgrade_mode", true));
+            return;
+        }
+
+        // Enable ML upgrade mode before upgrading the ML system indices to ensure nothing writes to them during the upgrade
+        Client originClient = new OriginSettingClient(client, ML_ORIGIN);
+        originClient.execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(true), ActionListener.wrap(
+            r -> listener.onResponse(Collections.singletonMap("already_in_upgrade_mode", false)),
+            listener::onFailure
+        ));
+    }
+
+    @Override
+    public void indicesMigrationComplete(
+        Map<String, Object> preUpgradeMetadata,
+        ClusterService clusterService,
+        Client client,
+        ActionListener<Boolean> listener
+    ) {
+        boolean wasAlreadyInUpgradeMode = (boolean) preUpgradeMetadata.getOrDefault("already_in_upgrade_mode", false);
+        if (wasAlreadyInUpgradeMode) {
+            // ML was already in upgrade mode before system indices upgrade started - we shouldn't disable it
+            listener.onResponse(true);
+            return;
+        }
+
+        Client originClient = new OriginSettingClient(client, ML_ORIGIN);
+        originClient.execute(SetUpgradeModeAction.INSTANCE, new SetUpgradeModeAction.Request(false), ActionListener.wrap(
+            r -> listener.onResponse(r.isAcknowledged()),
+            listener::onFailure
+        ));
+    }
+
     /**
      * These are the ML hidden indices. They are "associated" in the sense that if the ML system indices
      * are backed up or deleted then these hidden indices should also be backed up or deleted.
      */
-    private static Collection<AssociatedIndexDescriptor> ASSOCIATED_INDEX_DESCRIPTORS =
+    private static final Collection<AssociatedIndexDescriptor> ASSOCIATED_INDEX_DESCRIPTORS =
         List.of(
             new AssociatedIndexDescriptor(RESULTS_INDEX_PREFIX + "*", "Results indices"),
             new AssociatedIndexDescriptor(STATE_INDEX_PREFIX + "*", "State indices"),

+ 1 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -697,7 +697,7 @@ public class TransportStartDataFrameAnalyticsAction
             // Create the system index explicitly.  Although the master node would create it automatically on first use,
             // in a mixed version cluster where the master node is on an older version than this node relying on auto-creation
             // might use outdated mappings.
-            MlIndexAndAlias.createSystemIndexIfNecessary(client, clusterState, MachineLearning.getInferenceIndexSecurityDescriptor(),
+            MlIndexAndAlias.createSystemIndexIfNecessary(client, clusterState, MachineLearning.getInferenceIndexSystemIndexDescriptor(),
                 MlTasks.PERSISTENT_TASK_MASTER_NODE_TIMEOUT, indexCheckListener);
         }
 

+ 82 - 0
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningTests.java

@@ -6,20 +6,102 @@
  */
 package org.elasticsearch.xpack.ml;
 
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.monitor.os.OsStats;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ml.MlMetadata;
+import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.startsWith;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
 public class MachineLearningTests extends ESTestCase {
 
+    @SuppressWarnings("unchecked")
+    public void testPrePostSystemIndexUpgrade_givenNotInUpgradeMode() {
+        ThreadPool threadpool = new TestThreadPool("test");
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
+        Client client = mock(Client.class);
+        when(client.threadPool()).thenReturn(threadpool);
+        doAnswer(invocationOnMock -> {
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[2];
+            listener.onResponse(AcknowledgedResponse.TRUE);
+            return null;
+        }).when(client).execute(same(SetUpgradeModeAction.INSTANCE), any(SetUpgradeModeAction.Request.class), any(ActionListener.class));
+
+        MachineLearning machineLearning = createMachineLearning(Settings.EMPTY);
+
+        SetOnce<Map<String, Object>> response = new SetOnce<>();
+        machineLearning.prepareForIndicesMigration(clusterService, client, ActionListener.wrap(
+            response::set,
+            e -> fail(e.getMessage())
+        ));
+
+        assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", false)));
+        verify(client).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(true)), any(ActionListener.class));
+
+        machineLearning.indicesMigrationComplete(response.get(), clusterService, client, ActionListener.wrap(
+            ESTestCase::assertTrue,
+            e -> fail(e.getMessage())
+        ));
+
+        verify(client).execute(same(SetUpgradeModeAction.INSTANCE), eq(new SetUpgradeModeAction.Request(false)), any(ActionListener.class));
+
+        threadpool.shutdown();
+    }
+
+    public void testPrePostSystemIndexUpgrade_givenAlreadyInUpgradeMode() {
+        ClusterService clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(
+            ClusterState.builder(ClusterName.DEFAULT)
+                .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().isUpgradeMode(true).build())).build());
+        Client client = mock(Client.class);
+
+        MachineLearning machineLearning = createMachineLearning(Settings.EMPTY);
+
+        SetOnce<Map<String, Object>> response = new SetOnce<>();
+        machineLearning.prepareForIndicesMigration(clusterService, client, ActionListener.wrap(
+            response::set,
+            e -> fail(e.getMessage())
+        ));
+
+        assertThat(response.get(), equalTo(Collections.singletonMap("already_in_upgrade_mode", true)));
+        verifyZeroInteractions(client);
+
+        machineLearning.indicesMigrationComplete(response.get(), clusterService, client, ActionListener.wrap(
+            ESTestCase::assertTrue,
+            e -> fail(e.getMessage())
+        ));
+
+        // Neither pre nor post should have called any action
+        verifyZeroInteractions(client);
+    }
+
     public void testMaxOpenWorkersSetting_givenDefault() {
         int maxOpenWorkers = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(Settings.EMPTY);
         assertEquals(512, maxOpenWorkers);