Browse Source

[ML] [Deprecation] add deprecation check for job model snapshots that need upgraded (#66062)

This adds checks that verify that machine learning anomaly job model snapshots support the required minimal version.

If any are not the required version, directions are given to either delete the model snapshot, or utilize the _upgrade API.

relates: https://github.com/elastic/elasticsearch/issues/64154
Benjamin Trent 4 years ago
parent
commit
4ab10c5ed1

+ 2 - 0
x-pack/plugin/deprecation/qa/rest/build.gradle

@@ -9,6 +9,7 @@ esplugin {
 }
 
 dependencies {
+  javaRestTestImplementation project(':client:rest-high-level')
   javaRestTestImplementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
   javaRestTestImplementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson}")
 }
@@ -25,6 +26,7 @@ restResources {
 testClusters.all {
   testDistribution = 'DEFAULT'
   setting 'xpack.security.enabled', 'false'
+  setting 'xpack.license.self_generated.type', 'trial'
 }
 
 tasks.named("test").configure { enabled = false }

+ 99 - 0
x-pack/plugin/deprecation/qa/rest/src/javaRestTest/java/org/elasticsearch/xpack/deprecation/MlDeprecationIT.java

@@ -0,0 +1,99 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.deprecation;
+
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.WarningsHandler;
+import org.elasticsearch.client.migration.DeprecationInfoRequest;
+import org.elasticsearch.client.migration.DeprecationInfoResponse;
+import org.elasticsearch.client.ml.PutJobRequest;
+import org.elasticsearch.client.ml.job.config.AnalysisConfig;
+import org.elasticsearch.client.ml.job.config.DataDescription;
+import org.elasticsearch.client.ml.job.config.Detector;
+import org.elasticsearch.client.ml.job.config.Job;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.test.rest.ESRestTestCase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+
+public class MlDeprecationIT extends ESRestTestCase {
+
+    private static final RequestOptions REQUEST_OPTIONS = RequestOptions.DEFAULT.toBuilder()
+        .setWarningsHandler(WarningsHandler.PERMISSIVE)
+        .build();
+
+    private static class HLRC extends RestHighLevelClient {
+        HLRC(RestClient restClient) {
+            super(restClient, RestClient::close, new ArrayList<>());
+        }
+    }
+
+    @Override
+    protected NamedXContentRegistry xContentRegistry() {
+        SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
+        return new NamedXContentRegistry(searchModule.getNamedXContents());
+    }
+
+    @Override
+    protected boolean enableWarningsCheck() {
+        return false;
+    }
+
+    public void testMlDeprecationChecks() throws Exception {
+        HLRC hlrc = new HLRC(client());
+        String jobId = "deprecation_check_job";
+        hlrc.machineLearning()
+            .putJob(
+                new PutJobRequest(
+                    Job.builder(jobId)
+                        .setAnalysisConfig(
+                            AnalysisConfig.builder(Collections.singletonList(Detector.builder().setFunction("count").build()))
+                        )
+                        .setDataDescription(new DataDescription.Builder().setTimeField("time"))
+                        .build()
+                ),
+                REQUEST_OPTIONS
+            );
+
+        IndexRequest indexRequest = new IndexRequest(".ml-anomalies-.write-" + jobId).id(jobId + "_model_snapshot_1")
+            .source("{\"job_id\":\"deprecation_check_job\",\"snapshot_id\":\"1\", \"snapshot_doc_count\":1}", XContentType.JSON);
+        hlrc.index(indexRequest, REQUEST_OPTIONS);
+
+        indexRequest = new IndexRequest(".ml-anomalies-.write-" + jobId).id(jobId + "_model_snapshot_2")
+            .source(
+                "{\"job_id\":\"deprecation_check_job\",\"snapshot_id\":\"2\",\"snapshot_doc_count\":1,\"min_version\":\"8.0.0\"}",
+                XContentType.JSON
+            );
+        hlrc.index(indexRequest, REQUEST_OPTIONS);
+        hlrc.indices().refresh(new RefreshRequest(".ml-anomalies-*"), REQUEST_OPTIONS);
+
+        DeprecationInfoResponse response = hlrc.migration()
+            .getDeprecationInfo(
+                // specify an index so that deprecation checks don't run against any accidentally existing indices
+                new DeprecationInfoRequest(Collections.singletonList("index-that-does-not-exist-*")),
+                RequestOptions.DEFAULT
+            );
+        assertThat(response.getMlSettingsIssues(), hasSize(1));
+        assertThat(
+            response.getMlSettingsIssues().get(0).getMessage(),
+            containsString("model snapshot [1] for job [deprecation_check_job] needs to be deleted or upgraded")
+        );
+        hlrc.close();
+    }
+
+}

+ 50 - 2
x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/MlDeprecationChecker.java

@@ -6,17 +6,22 @@
 
 package org.elasticsearch.xpack.deprecation;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsAction;
+import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
 
 public class MlDeprecationChecker implements DeprecationChecker {
@@ -45,6 +50,29 @@ public class MlDeprecationChecker implements DeprecationChecker {
         }
     }
 
+    static Optional<DeprecationIssue> checkModelSnapshot(ModelSnapshot modelSnapshot) {
+        if (modelSnapshot.getMinVersion().before(Version.V_7_0_0)) {
+            return Optional.of(new DeprecationIssue(DeprecationIssue.Level.CRITICAL,
+                String.format(
+                    Locale.ROOT,
+                    "model snapshot [%s] for job [%s] needs to be deleted or upgraded",
+                    modelSnapshot.getSnapshotId(),
+                    modelSnapshot.getJobId()
+                ),
+                "https://www.elastic.co/guide/en/elasticsearch/reference/master/ml-upgrade-job-model-snapshot.html",
+               String.format(
+                   Locale.ROOT,
+                   "model snapshot [%s] for job [%s] supports minimum version [%s] and needs to be at least [%s]",
+                   modelSnapshot.getSnapshotId(),
+                   modelSnapshot.getJobId(),
+                   modelSnapshot.getMinVersion(),
+                   Version.V_7_0_0
+               )
+            ));
+        }
+        return Optional.empty();
+    }
+
     @Override
     public boolean enabled(Settings settings) {
         return XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
@@ -56,16 +84,36 @@ public class MlDeprecationChecker implements DeprecationChecker {
             deprecationIssueListener.onResponse(new CheckResult(getName(), Collections.emptyList()));
             return;
         }
+        List<DeprecationIssue> issues = Collections.synchronizedList(new ArrayList<>());
+        final GetModelSnapshotsAction.Request getModelSnapshots = new GetModelSnapshotsAction.Request("*", null);
+        getModelSnapshots.setPageParams(new PageParams(0, 50));
+        getModelSnapshots.setSort(ModelSnapshot.MIN_VERSION.getPreferredName());
+
+        ActionListener<Void> getModelSnaphots = ActionListener.wrap(
+            _unused -> components.client().execute(
+                GetModelSnapshotsAction.INSTANCE,
+                getModelSnapshots,
+                ActionListener.wrap(
+                    modelSnapshots -> {
+                        modelSnapshots.getResources()
+                            .results()
+                            .forEach(modelSnapshot -> checkModelSnapshot(modelSnapshot)
+                                .ifPresent(issues::add));
+                        deprecationIssueListener.onResponse(new CheckResult(getName(), issues));
+                    },
+                    deprecationIssueListener::onFailure)
+            ),
+            deprecationIssueListener::onFailure);
+
         components.client().execute(
             GetDatafeedsAction.INSTANCE,
             new GetDatafeedsAction.Request(GetDatafeedsAction.ALL), ActionListener.wrap(
                 datafeedsResponse -> {
-                    List<DeprecationIssue> issues = new ArrayList<>();
                     for (DatafeedConfig df : datafeedsResponse.getResponse().results()) {
                         checkDataFeedAggregations(df, components.xContentRegistry()).ifPresent(issues::add);
                         checkDataFeedQuery(df, components.xContentRegistry()).ifPresent(issues::add);
                     }
-                    deprecationIssueListener.onResponse(new CheckResult(getName(), issues));
+                    getModelSnaphots.onResponse(null);
                 },
                 deprecationIssueListener::onFailure
             )

+ 34 - 1
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobResultsProviderIT.java

@@ -37,6 +37,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.action.util.PageParams;
@@ -647,16 +648,30 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         Job.Builder job = createJob(jobId);
         indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_2")
             .setTimestamp(Date.from(Instant.ofEpochMilli(10)))
+            .setMinVersion(Version.V_7_4_0)
             .build());
         indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("snap_1")
             .setTimestamp(Date.from(Instant.ofEpochMilli(11)))
+            .setMinVersion(Version.V_7_2_0)
             .build());
         indexModelSnapshot(new ModelSnapshot.Builder(jobId).setSnapshotId("other_snap")
             .setTimestamp(Date.from(Instant.ofEpochMilli(12)))
+            .setMinVersion(Version.V_7_3_0)
             .build());
+        createJob("other_job");
+        indexModelSnapshot(new ModelSnapshot.Builder("other_job").setSnapshotId("other_snap")
+            .setTimestamp(Date.from(Instant.ofEpochMilli(10)))
+            .setMinVersion(Version.CURRENT)
+            .build());
+        // Add a snapshot WITHOUT a min version.
+        client().prepareIndex(AnomalyDetectorsIndex.jobResultsAliasedName("other_job"))
+            .setId(ModelSnapshot.documentId("other_job", "11"))
+            .setSource("{\"job_id\":\"other_job\"," +
+                "\"snapshot_id\":\"11\", \"snapshot_doc_count\":1,\"retain\":false}", XContentType.JSON)
+            .get();
 
         client().admin().indices().prepareRefresh(AnomalyDetectorsIndex.jobStateIndexPattern(),
-            AnomalyDetectorsIndex.jobResultsAliasedName(jobId)).get();
+            AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*").get();
 
         PlainActionFuture<QueryPage<ModelSnapshot>> future = new PlainActionFuture<>();
         jobProvider.modelSnapshots(jobId, 0, 4, "9", "15", "", false, "snap_2,snap_1", future::onResponse, future::onFailure);
@@ -683,6 +698,24 @@ public class JobResultsProviderIT extends MlSingleNodeTestCase {
         assertThat(snapshots.get(0).getSnapshotId(), equalTo("snap_2"));
         assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
         assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap"));
+
+        future = new PlainActionFuture<>();
+        jobProvider.modelSnapshots("*",
+            0,
+            5,
+            null,
+            null,
+            "min_version",
+            false,
+            null,
+            future::onResponse,
+            future::onFailure);
+        snapshots = future.actionGet().results();
+        assertThat(snapshots.get(0).getSnapshotId(), equalTo("11"));
+        assertThat(snapshots.get(1).getSnapshotId(), equalTo("snap_1"));
+        assertThat(snapshots.get(2).getSnapshotId(), equalTo("other_snap"));
+        assertThat(snapshots.get(3).getSnapshotId(), equalTo("snap_2"));
+        assertThat(snapshots.get(4).getSnapshotId(), equalTo("other_snap"));
     }
 
     public void testGetAutodetectParams() throws Exception {

+ 20 - 9
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetModelSnapshotsAction.java

@@ -11,6 +11,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
@@ -53,19 +54,29 @@ public class TransportGetModelSnapshotsAction extends HandledTransportAction<Get
                 request.getSort(),
                 request.getDescOrder()));
 
+        if (Strings.isAllOrWildcard(request.getJobId())) {
+            getModelSnapshots(request, listener);
+            return;
+        }
         jobManager.jobExists(request.getJobId(), ActionListener.wrap(
-                ok -> {
-                    jobResultsProvider.modelSnapshots(request.getJobId(), request.getPageParams().getFrom(),
-                            request.getPageParams().getSize(), request.getStart(), request.getEnd(), request.getSort(),
-                            request.getDescOrder(), request.getSnapshotId(),
-                            page -> {
-                                listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page)));
-                            }, listener::onFailure);
-                },
-                listener::onFailure
+            ok -> getModelSnapshots(request, listener),
+            listener::onFailure
         ));
     }
 
+    private void getModelSnapshots(GetModelSnapshotsAction.Request request, ActionListener<GetModelSnapshotsAction.Response> listener) {
+        jobResultsProvider.modelSnapshots(request.getJobId(),
+            request.getPageParams().getFrom(),
+            request.getPageParams().getSize(),
+            request.getStart(),
+            request.getEnd(),
+            request.getSort(),
+            request.getDescOrder(),
+            request.getSnapshotId(),
+            page -> listener.onResponse(new GetModelSnapshotsAction.Response(clearQuantiles(page))),
+            listener::onFailure);
+    }
+
     public static QueryPage<ModelSnapshot> clearQuantiles(QueryPage<ModelSnapshot> page) {
         if (page.results() == null) {
             return page;

+ 6 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsProvider.java

@@ -12,6 +12,7 @@ import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -1062,6 +1063,11 @@ public class JobResultsProvider {
 
         FieldSortBuilder sb = new FieldSortBuilder(sortField)
                 .order(sortDescending ? SortOrder.DESC : SortOrder.ASC);
+        // `min_version` might not be present in very early snapshots.
+        // Consequently, we should treat it as being at least from 6.3.0 or before
+        if (sortField.equals(ModelSnapshot.MIN_VERSION.getPreferredName())) {
+            sb.missing(Version.fromString("6.3.0"));
+        }
 
         String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
         LOGGER.trace("ES API CALL: search all model snapshots from index {} sort ascending {} with filter after sort from {} size {}",