Browse Source

[ML] Extend use of MlConfigVersion to one more place (#99878)

In #97699 we moved away from Version to MlConfigVersion for
versioning ML configurations. However, one place was missed.
This PR adjusts that missing place, namely the metadata on
DFA destination indices. As a result of this change being
shipped in a later version than the original change, a hack
is required to map the intervening product versions to the
MlConfigVersion that was in force.
David Roberts 2 years ago
parent
commit
40029a77d1

+ 7 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlConfigVersion.java

@@ -321,13 +321,18 @@ public record MlConfigVersion(int id) implements VersionId<MlConfigVersion>, ToX
     }
 
     // Parse an MlConfigVersion from a string.
-    // Note that version "8.10.0" is silently converted to "10.0.0".
+    // Note that version "8.10.x" and "8.11.0" are silently converted to "10.0.0".
     // This is to support upgrade scenarios in pre-prod QA environments.
     public static MlConfigVersion fromString(String str) {
         if (str == null) {
             return CURRENT;
         }
-        if (str.equals("8.10.0")) {
+        // The whole switch from Version to MlConfigVersion was supposed to take
+        // place during development of 8.10.0, however, one place was missed. As
+        // a result there may be DFA destination indices in the wild with metadata
+        // containing 8.10.1, 8.10.2, 8.10.3 or 8.11.0. We can treat these as V_10
+        // for config version comparison purposes.
+        if (str.startsWith("8.10.") || str.equals("8.11.0")) {
             return V_10;
         }
         Matcher matcher = Pattern.compile("^(\\d+)\\.0\\.0$").matcher(str);

+ 1 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartDataFrameAnalyticsAction.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.core.ml.action;
 
 import org.elasticsearch.TransportVersion;
 import org.elasticsearch.TransportVersions;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.master.MasterNodeRequest;
@@ -148,7 +147,7 @@ public class StartDataFrameAnalyticsAction extends ActionType<NodeAcknowledgedRe
 
         public static final MlConfigVersion VERSION_INTRODUCED = MlConfigVersion.V_7_3_0;
         public static final TransportVersion TRANSPORT_VERSION_INTRODUCED = TransportVersions.V_7_3_0;
-        public static final Version VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED = Version.V_7_10_0;
+        public static final MlConfigVersion VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED = MlConfigVersion.V_7_10_0;
 
         public static final ConstructingObjectParser<TaskParams, Void> PARSER = new ConstructingObjectParser<>(
             MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME,

+ 0 - 13
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java

@@ -810,19 +810,6 @@ public class TransportStartDataFrameAnalyticsAction extends TransportMasterNodeA
                     + TaskParams.VERSION_INTRODUCED
                     + "] or higher";
             }
-            if (node.getVersion().before(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED)
-                && params.getVersion().onOrAfter(MlConfigVersion.fromVersion(TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED))) {
-                return "Not opening job ["
-                    + id
-                    + "] on node ["
-                    + JobNodeSelector.nodeNameAndVersion(node)
-                    + "], because the data frame analytics created for version ["
-                    + params.getVersion()
-                    + "] requires a node of version "
-                    + "["
-                    + TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED
-                    + "] or higher";
-            }
 
             return null;
         }

+ 8 - 8
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndex.java

@@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.dataframe;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -33,6 +32,7 @@ import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.index.mapper.NumberFieldMapper;
 import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.ml.MlConfigVersion;
 import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
@@ -95,7 +95,7 @@ public final class DestinationIndex {
      * If the results mappings change in a way existing destination indices will fail to index
      * the results, this should be bumped accordingly.
      */
-    public static final Version MIN_COMPATIBLE_VERSION =
+    public static final MlConfigVersion MIN_COMPATIBLE_VERSION =
         StartDataFrameAnalyticsAction.TaskParams.VERSION_DESTINATION_INDEX_MAPPINGS_CHANGED;
 
     private DestinationIndex() {}
@@ -202,7 +202,7 @@ public final class DestinationIndex {
         checkResultsFieldIsNotPresentInProperties(config, properties);
         properties.putAll(createAdditionalMappings(config, fieldCapabilitiesResponse));
         Map<String, Object> metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new);
-        metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT));
+        metadata.putAll(createMetadata(config.getId(), clock, MlConfigVersion.CURRENT));
         if (config.getSource().getRuntimeMappings().isEmpty() == false) {
             Map<String, Object> runtimeMappings = getOrPutDefault(mappingsAsMap, RUNTIME, HashMap::new);
             runtimeMappings.putAll(config.getSource().getRuntimeMappings());
@@ -317,7 +317,7 @@ public final class DestinationIndex {
     }
 
     // Visible for testing
-    static Map<String, Object> createMetadata(String analyticsId, Clock clock, Version version) {
+    static Map<String, Object> createMetadata(String analyticsId, Clock clock, MlConfigVersion version) {
         Map<String, Object> metadata = new HashMap<>();
         metadata.put(CREATION_DATE_MILLIS, clock.millis());
         metadata.put(CREATED_BY, DFA_CREATOR);
@@ -403,11 +403,11 @@ public final class DestinationIndex {
     }
 
     @SuppressWarnings("unchecked")
-    private static Version getVersion(String jobId, Map<String, Object> meta) {
+    private static MlConfigVersion getVersion(String jobId, Map<String, Object> meta) {
         try {
             Map<String, Object> version = (Map<String, Object>) meta.get(VERSION);
             String createdVersionString = (String) version.get(CREATED);
-            return Version.fromString(createdVersionString);
+            return MlConfigVersion.fromString(createdVersionString);
         } catch (Exception e) {
             logger.error(() -> "[" + jobId + "] Could not retrieve destination index version", e);
             return null;
@@ -443,9 +443,9 @@ public final class DestinationIndex {
 
     private static class DestMetadata implements Metadata {
 
-        private final Version version;
+        private final MlConfigVersion version;
 
-        private DestMetadata(Version version) {
+        private DestMetadata(MlConfigVersion version) {
             this.version = version;
         }
 

+ 0 - 42
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsActionTests.java

@@ -103,48 +103,6 @@ public class TransportStartDataFrameAnalyticsActionTests extends ESTestCase {
         );
     }
 
-    // Cannot assign the node because none of the existing nodes is appropriate:
-    // - _node_name0 is too old (version 7.2.0)
-    // - _node_name1 is too old (version 7.9.1)
-    // - _node_name2 is too old (version 7.9.2)
-    public void testGetAssignment_MlNodesAreTooOld() {
-        TaskExecutor executor = createTaskExecutor();
-        TaskParams params = new TaskParams(JOB_ID, MlConfigVersion.CURRENT, false);
-        ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
-            .metadata(Metadata.builder().putCustom(MlMetadata.TYPE, new MlMetadata.Builder().build()))
-            .nodes(
-                DiscoveryNodes.builder()
-                    .add(createNode(0, true, Version.V_7_2_0, MlConfigVersion.V_7_2_0))
-                    .add(createNode(1, true, Version.V_7_9_1, MlConfigVersion.V_7_9_1))
-                    .add(createNode(2, true, Version.V_7_9_2, MlConfigVersion.V_7_9_2))
-            )
-            .build();
-
-        Assignment assignment = executor.getAssignment(params, clusterState.nodes().getAllNodes(), clusterState);
-        assertThat(assignment.getExecutorNode(), is(nullValue()));
-        assertThat(
-            assignment.getExplanation(),
-            allOf(
-                containsString(
-                    "Not opening job [data_frame_id] on node [{_node_name0}{version=7.2.0}], "
-                        + "because the data frame analytics requires a node of version [7.3.0] or higher"
-                ),
-                containsString(
-                    "Not opening job [data_frame_id] on node [{_node_name1}{version=7.9.1}], "
-                        + "because the data frame analytics created for version ["
-                        + MlConfigVersion.CURRENT
-                        + "] requires a node of version [7.10.0] or higher"
-                ),
-                containsString(
-                    "Not opening job [data_frame_id] on node [{_node_name2}{version=7.9.2}], "
-                        + "because the data frame analytics created for version ["
-                        + MlConfigVersion.CURRENT
-                        + "] requires a node of version [7.10.0] or higher"
-                )
-            )
-        );
-    }
-
     // The node can be assigned despite being newer than the job.
     // In such a case destination index will be created from scratch so that its mappings are up-to-date.
     public void testGetAssignment_MlNodeIsNewerThanTheMlJobButTheAssignmentSuceeds() {

+ 4 - 3
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/dataframe/DestinationIndexTests.java

@@ -37,6 +37,7 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xcontent.XContentParser;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.core.ml.MlConfigVersion;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
@@ -748,7 +749,7 @@ public class DestinationIndexTests extends ESTestCase {
 
     public void testReadMetadata_GivenCurrentVersion() {
         Map<String, Object> mappings = new HashMap<>();
-        mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.CURRENT));
+        mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), MlConfigVersion.CURRENT));
         MappingMetadata mappingMetadata = mock(MappingMetadata.class);
         when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);
 
@@ -756,7 +757,7 @@ public class DestinationIndexTests extends ESTestCase {
 
         assertThat(metadata.hasMetadata(), is(true));
         assertThat(metadata.isCompatible(), is(true));
-        assertThat(metadata.getVersion(), equalTo(Version.CURRENT.toString()));
+        assertThat(metadata.getVersion(), equalTo(MlConfigVersion.CURRENT.toString()));
     }
 
     public void testReadMetadata_GivenMinCompatibleVersion() {
@@ -774,7 +775,7 @@ public class DestinationIndexTests extends ESTestCase {
 
     public void testReadMetadata_GivenIncompatibleVersion() {
         Map<String, Object> mappings = new HashMap<>();
-        mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), Version.V_7_9_3));
+        mappings.put("_meta", DestinationIndex.createMetadata("test_id", Clock.systemUTC(), MlConfigVersion.V_7_9_3));
         MappingMetadata mappingMetadata = mock(MappingMetadata.class);
         when(mappingMetadata.getSourceAsMap()).thenReturn(mappings);