浏览代码

[ML] Model snapshot upgrade needs a stats endpoint (#81641)

Previously the ML model snapshot upgrade endpoint did not
provide a way to reliably monitor progress. This could lead
to the upgrade assistant UI thinking that a model snapshot
upgrade had finished when it actually hadn't.

This change adds a new "stats" API that allows external
interested parties to find out the status of each model
snapshot upgrade and which node (if any) each is running on.

Fixes #81519
David Roberts 3 年之前
父节点
当前提交
0559dd087b
共有 16 个文件被更改,包括 1069 次插入127 次删除
  1. 156 0
      docs/reference/ml/anomaly-detection/apis/get-job-model-snapshot-upgrade-stats.asciidoc
  2. 1 0
      docs/reference/ml/anomaly-detection/apis/index.asciidoc
  3. 1 0
      docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc
  4. 40 0
      rest-api-spec/src/main/resources/rest-api-spec/api/ml.get_model_snapshot_upgrade_stats.json
  5. 50 16
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcher.java
  6. 10 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java
  7. 295 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsAction.java
  8. 146 102
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcherTests.java
  9. 27 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsActionRequestTests.java
  10. 54 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsActionResponseTests.java
  11. 5 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  12. 130 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobModelSnapshotsUpgradeStatsAction.java
  13. 49 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetJobModelSnapshotsUpgradeStatsAction.java
  14. 1 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java
  15. 80 1
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/upgrade_job_snapshot.yml
  16. 24 8
      x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java

+ 156 - 0
docs/reference/ml/anomaly-detection/apis/get-job-model-snapshot-upgrade-stats.asciidoc

@@ -0,0 +1,156 @@
+[role="xpack"]
+[[ml-get-job-model-snapshot-upgrade-stats]]
+= Get {anomaly-job} model snapshot upgrade statistics API
+
+[subs="attributes"]
+++++
+<titleabbrev>Get model snapshot upgrade statistics</titleabbrev>
+++++
+
+Retrieves usage information for {anomaly-job} model snapshot upgrades.
+
+[[ml-get-job-model-snapshot-upgrade-stats-request]]
+== {api-request-title}
+
+`GET _ml/anomaly_detectors/<job_id>/model_snapshots/<snapshot_id>/_upgrade/_stats` +
+
+`GET _ml/anomaly_detectors/<job_id>,<job_id>/model_snapshots/_all/_upgrade/_stats` +
+
+`GET _ml/anomaly_detectors/_all/model_snapshots/_all/_upgrade/_stats`
+
+[[ml-get-job-model-snapshot-upgrade-stats-prereqs]]
+== {api-prereq-title}
+
+Requires the `monitor_ml` cluster privilege. This privilege is included in the
+`machine_learning_user` built-in role.
+
+[[ml-get-job-model-snapshot-upgrade-stats-desc]]
+== {api-description-title}
+
+{anomaly-detect-cap} job model snapshot upgrades are ephemeral. Only
+upgrades that are in progress at the time this API is called will be
+returned.
+
+[[ml-get-job-model-snapshot-upgrade-stats-path-parms]]
+== {api-path-parms-title}
+
+`<job_id>`::
+(string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection-wildcard]
+
+`<snapshot_id>`::
+(string)
+Identifier for the model snapshot.
++
+You can get statistics for multiple {anomaly-job} model snapshot upgrades in a
+single API request by using a comma-separated list of snapshot IDs. You can also
+use wildcard expressions or `_all`.
+
+[[ml-get-job-model-snapshot-upgrade-stats-query-parms]]
+== {api-query-parms-title}
+
+`allow_no_match`::
+(Optional, Boolean)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=allow-no-match-jobs]
+
+[role="child_attributes"]
+[[ml-get-job-model-snapshot-upgrade-stats-results]]
+== {api-response-body-title}
+
+The API returns an array of {anomaly-job} model snapshot upgrade status objects.
+All of these properties are informational; you cannot update their values.
+
+`assignment_explanation`::
+(string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=assignment-explanation-datafeeds]
+
+`job_id`::
+(string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection]
+
+`node`::
+(object)
+Contains properties for the node that runs the upgrade task. This information is
+available only for upgrade tasks that are assigned to a node.
++
+--
+[%collapsible%open]
+====
+`attributes`:::
+(object)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-attributes]
+
+`ephemeral_id`:::
+(string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-ephemeral-id]
+
+`id`:::
+(string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-id]
+
+`name`:::
+(string)
+The node name. For example, `0-o0tOo`.
+
+`transport_address`:::
+(string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=node-transport-address]
+====
+--
+
+`snapshot_id`::
+(string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=model-snapshot-id]
+
+`state`::
+(string)
+One of `loading_old_state`, `saving_new_state`, `stopped` or `failed`.
+
+
+[[ml-get-job-model-snapshot-upgrade-stats-response-codes]]
+== {api-response-codes-title}
+
+`404` (Missing resources)::
+  If `allow_no_match` is `false`, this code indicates that there are no
+  resources that match the request or only partial matches for the request.
+
+[[ml-get-job-model-snapshot-upgrade-stats-example]]
+== {api-examples-title}
+
+[source,console]
+--------------------------------------------------
+GET _ml/anomaly_detectors/low_request_rate/model_snapshots/_all/_upgrade/_stats
+--------------------------------------------------
+// TEST[skip:it will be too difficult to get a reliable response in docs tests]
+
+The API returns the following results:
+
+[source,console-result]
+----
+{
+  "count" : 1,
+  "model_snapshot_upgrades" : [
+    {
+      "job_id" : "low_request_rate",
+      "snapshot_id" : "1828371",
+      "state" : "saving_new_state",
+      "node" : {
+        "id" : "7bmMXyWCRs-TuPfGJJ_yMw",
+        "name" : "node-0",
+        "ephemeral_id" : "hoXMLZB0RWKfR9UPPUCxXX",
+        "transport_address" : "127.0.0.1:9300",
+        "attributes" : {
+          "ml.machine_memory" : "17179869184",
+          "ml.max_open_jobs" : "512"
+        }
+      },
+      "assignment_explanation" : ""
+    }
+  ]
+}
+----
+// TESTRESPONSE[s/"7bmMXyWCRs-TuPfGJJ_yMw"/$body.$_path/]
+// TESTRESPONSE[s/"node-0"/$body.$_path/]
+// TESTRESPONSE[s/"hoXMLZB0RWKfR9UPPUCxXX"/$body.$_path/]
+// TESTRESPONSE[s/"127.0.0.1:9300"/$body.$_path/]
+// TESTRESPONSE[s/"17179869184"/$body.datafeeds.0.node.attributes.ml\\.machine_memory/]

+ 1 - 0
docs/reference/ml/anomaly-detection/apis/index.asciidoc

@@ -36,6 +36,7 @@ include::get-job.asciidoc[leveloffset=+2]
 include::get-job-stats.asciidoc[leveloffset=+2]
 include::get-ml-info.asciidoc[leveloffset=+2]
 include::get-snapshot.asciidoc[leveloffset=+2]
+include::get-job-model-snapshot-upgrade-stats.asciidoc[leveloffset=+2]
 include::get-overall-buckets.asciidoc[leveloffset=+2]
 include::get-calendar-event.asciidoc[leveloffset=+2]
 include::get-filter.asciidoc[leveloffset=+2]

+ 1 - 0
docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc

@@ -55,6 +55,7 @@ See also <<ml-df-analytics-apis>>.
 
 * <<ml-delete-snapshot,Delete model snapshot>>
 * <<ml-get-snapshot,Get model snapshot info>>
+* <<ml-get-job-model-snapshot-upgrade-stats,Get model snapshot upgrade statistics>>
 * <<ml-revert-snapshot,Revert model snapshot>>
 * <<ml-update-snapshot,Update model snapshot>>
 * <<ml-upgrade-job-model-snapshot,Upgrade model snapshot>>

+ 40 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/ml.get_model_snapshot_upgrade_stats.json

@@ -0,0 +1,40 @@
+{
+  "ml.get_model_snapshot_upgrade_stats":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-get-job-model-snapshot-upgrade-stats.html",
+      "description":"Gets stats for anomaly detection job model snapshot upgrades that are in progress."
+    },
+    "stability":"stable",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_ml/anomaly_detectors/{job_id}/model_snapshots/{snapshot_id}/_upgrade/_stats",
+          "methods":[
+            "GET"
+          ],
+          "parts":{
+            "job_id":{
+              "type":"string",
+              "description":"The ID of the job. May be a wildcard, comma separated list or `_all`."
+            },
+            "snapshot_id":{
+              "type":"string",
+              "description":"The ID of the snapshot. May be a wildcard, comma separated list or `_all`."
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "allow_no_match":{
+        "type":"boolean",
+        "required":false,
+        "description":"Whether to ignore if a wildcard expression matches no jobs or no snapshots. (This includes the `_all` string.)"
+      }
+    }
+  }
+}

+ 50 - 16
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcher.java

@@ -9,7 +9,10 @@ package org.elasticsearch.xpack.core.action.util;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.regex.Regex;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -43,7 +46,8 @@ public final class ExpandedIdsMatcher {
         return Strings.tokenizeToStringArray(expression, ",");
     }
 
-    private final LinkedList<IdMatcher> requiredMatches;
+    private final List<IdMatcher> allMatchers;
+    private final List<IdMatcher> requiredMatches;
     private final boolean onlyExact;
 
     /**
@@ -57,15 +61,18 @@ public final class ExpandedIdsMatcher {
      */
     public ExpandedIdsMatcher(String[] tokens, boolean allowNoMatchForWildcards) {
         requiredMatches = new LinkedList<>();
+        List<IdMatcher> allMatchers = new ArrayList<>();
 
         if (Strings.isAllOrWildcard(tokens)) {
             // if allowNoJobForWildcards == true then any number
             // of jobs with any id is ok. Therefore no matches
             // are required
 
+            IdMatcher matcher = new WildcardMatcher("*");
+            this.allMatchers = Collections.singletonList(matcher);
             if (allowNoMatchForWildcards == false) {
                 // require something, anything to match
-                requiredMatches.add(new WildcardMatcher("*"));
+                requiredMatches.add(matcher);
             }
             onlyExact = false;
             return;
@@ -78,23 +85,55 @@ public final class ExpandedIdsMatcher {
             // specific job Ids are
             for (String token : tokens) {
                 if (Regex.isSimpleMatchPattern(token)) {
+                    allMatchers.add(new WildcardMatcher(token));
                     atLeastOneWildcard = true;
                 } else {
-                    requiredMatches.add(new EqualsIdMatcher(token));
+                    IdMatcher matcher = new EqualsIdMatcher(token);
+                    allMatchers.add(matcher);
+                    requiredMatches.add(matcher);
                 }
             }
         } else {
             // Matches are required for wildcards
             for (String token : tokens) {
                 if (Regex.isSimpleMatchPattern(token)) {
-                    requiredMatches.add(new WildcardMatcher(token));
+                    IdMatcher matcher = new WildcardMatcher(token);
+                    allMatchers.add(matcher);
+                    requiredMatches.add(matcher);
                     atLeastOneWildcard = true;
                 } else {
-                    requiredMatches.add(new EqualsIdMatcher(token));
+                    IdMatcher matcher = new EqualsIdMatcher(token);
+                    allMatchers.add(matcher);
+                    requiredMatches.add(matcher);
                 }
             }
         }
         onlyExact = atLeastOneWildcard == false;
+        this.allMatchers = Collections.unmodifiableList(allMatchers);
+    }
+
+    /**
+     * Generate the list of required matches from the {@code expression}
+     * and initialize.
+     *
+     * @param expression Expression that will be tokenized into a set of wildcards or full Ids
+     * @param allowNoMatchForWildcards If true then it is not required for wildcard
+     *                                 expressions to match an Id meaning they are
+     *                                 not returned in the list of required matches
+     */
+    public ExpandedIdsMatcher(String expression, boolean allowNoMatchForWildcards) {
+        this(tokenizeExpression(expression), allowNoMatchForWildcards);
+    }
+
+    /**
+     * Test whether an ID matches any of the expressions.
+     * Unlike {@link #filterMatchedIds} this does not modify the state of
+     * the matcher.
+     * @param id ID to test.
+     * @return Does the ID match one or more of the patterns in the expression?
+     */
+    public boolean idMatches(String id) {
+        return allMatchers.stream().anyMatch(idMatcher -> idMatcher.matches(id));
     }
 
     /**
@@ -149,23 +188,18 @@ public final class ExpandedIdsMatcher {
      */
     public static class SimpleIdsMatcher {
 
-        private final LinkedList<IdMatcher> requiredMatches;
+        private final List<IdMatcher> matchers;
 
         public SimpleIdsMatcher(String[] tokens) {
-            requiredMatches = new LinkedList<>();
 
             if (Strings.isAllOrWildcard(tokens)) {
-                requiredMatches.add(new WildcardMatcher("*"));
+                matchers = Collections.singletonList(new WildcardMatcher("*"));
                 return;
             }
 
-            for (String token : tokens) {
-                if (Regex.isSimpleMatchPattern(token)) {
-                    requiredMatches.add(new WildcardMatcher(token));
-                } else {
-                    requiredMatches.add(new EqualsIdMatcher(token));
-                }
-            }
+            matchers = Arrays.stream(tokens)
+                .map(token -> Regex.isSimpleMatchPattern(token) ? new WildcardMatcher(token) : new EqualsIdMatcher(token))
+                .collect(Collectors.toList());
         }
 
         /**
@@ -175,7 +209,7 @@ public final class ExpandedIdsMatcher {
          * @return True if the given id is matched by any of the matchers
          */
         public boolean idMatches(String id) {
-            return requiredMatches.stream().anyMatch(idMatcher -> idMatcher.matches(id));
+            return matchers.stream().anyMatch(idMatcher -> idMatcher.matches(id));
         }
     }
 

+ 10 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java

@@ -313,6 +313,16 @@ public final class MlTasks {
         });
     }
 
+    public static Collection<PersistentTasksCustomMetadata.PersistentTask<?>> snapshotUpgradeTasks(
+        @Nullable PersistentTasksCustomMetadata tasks
+    ) {
+        if (tasks == null) {
+            return Collections.emptyList();
+        }
+
+        return tasks.findTasks(JOB_SNAPSHOT_UPGRADE_TASK_NAME, task -> true);
+    }
+
     public static Collection<PersistentTasksCustomMetadata.PersistentTask<?>> snapshotUpgradeTasksOnNode(
         @Nullable PersistentTasksCustomMetadata tasks,
         String nodeId

+ 295 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsAction.java

@@ -0,0 +1,295 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.master.MasterNodeReadRequest;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContentObject;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
+import org.elasticsearch.xpack.core.action.util.QueryPage;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction.Request.SNAPSHOT_ID;
+
+public class GetJobModelSnapshotsUpgradeStatsAction extends ActionType<GetJobModelSnapshotsUpgradeStatsAction.Response> {
+
+    public static final GetJobModelSnapshotsUpgradeStatsAction INSTANCE = new GetJobModelSnapshotsUpgradeStatsAction();
+    public static final String NAME = "cluster:monitor/xpack/ml/job/model_snapshots/upgrade/stats/get";
+
+    public static final String ALL = "_all";
+    private static final String STATE = "state";
+    private static final String NODE = "node";
+    private static final String ASSIGNMENT_EXPLANATION = "assignment_explanation";
+
+    // Used for QueryPage
+    public static final ParseField RESULTS_FIELD = new ParseField("model_snapshot_upgrades");
+    public static String TYPE = "model_snapshot_upgrade";
+
+    private GetJobModelSnapshotsUpgradeStatsAction() {
+        super(NAME, GetJobModelSnapshotsUpgradeStatsAction.Response::new);
+    }
+
+    public static class Request extends MasterNodeReadRequest<GetJobModelSnapshotsUpgradeStatsAction.Request> {
+
+        public static final String ALLOW_NO_MATCH = "allow_no_match";
+
+        private final String jobId;
+        private final String snapshotId;
+        private boolean allowNoMatch = true;
+
+        public Request(String jobId, String snapshotId) {
+            this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
+            this.snapshotId = ExceptionsHelper.requireNonNull(snapshotId, SNAPSHOT_ID.getPreferredName());
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            jobId = in.readString();
+            snapshotId = in.readString();
+            allowNoMatch = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeString(jobId);
+            out.writeString(snapshotId);
+            out.writeBoolean(allowNoMatch);
+        }
+
+        public String getJobId() {
+            return jobId;
+        }
+
+        public String getSnapshotId() {
+            return snapshotId;
+        }
+
+        public boolean allowNoMatch() {
+            return allowNoMatch;
+        }
+
+        public void setAllowNoMatch(boolean allowNoMatch) {
+            this.allowNoMatch = allowNoMatch;
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(jobId, snapshotId, allowNoMatch);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            Request other = (Request) obj;
+            return Objects.equals(jobId, other.jobId)
+                && Objects.equals(snapshotId, other.snapshotId)
+                && Objects.equals(allowNoMatch, other.allowNoMatch);
+        }
+    }
+
+    public static class Response extends AbstractGetResourcesResponse<Response.JobModelSnapshotUpgradeStats> implements ToXContentObject {
+
+        public static class JobModelSnapshotUpgradeStats implements ToXContentObject, Writeable {
+
+            private final String jobId;
+            private final String snapshotId;
+            private final SnapshotUpgradeState upgradeState;
+            @Nullable
+            private final DiscoveryNode node;
+            @Nullable
+            private final String assignmentExplanation;
+
+            public JobModelSnapshotUpgradeStats(
+                String jobId,
+                String snapshotId,
+                SnapshotUpgradeState upgradeState,
+                @Nullable DiscoveryNode node,
+                @Nullable String assignmentExplanation
+            ) {
+                this.jobId = Objects.requireNonNull(jobId);
+                this.snapshotId = Objects.requireNonNull(snapshotId);
+                this.upgradeState = Objects.requireNonNull(upgradeState);
+                this.node = node;
+                this.assignmentExplanation = assignmentExplanation;
+            }
+
+            JobModelSnapshotUpgradeStats(StreamInput in) throws IOException {
+                jobId = in.readString();
+                snapshotId = in.readString();
+                upgradeState = SnapshotUpgradeState.fromStream(in);
+                node = in.readOptionalWriteable(DiscoveryNode::new);
+                assignmentExplanation = in.readOptionalString();
+            }
+
+            public String getJobId() {
+                return jobId;
+            }
+
+            public String getSnapshotId() {
+                return snapshotId;
+            }
+
+            public SnapshotUpgradeState getUpgradeState() {
+                return upgradeState;
+            }
+
+            public DiscoveryNode getNode() {
+                return node;
+            }
+
+            public String getAssignmentExplanation() {
+                return assignmentExplanation;
+            }
+
+            @Override
+            public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+                builder.startObject();
+                builder.field(Job.ID.getPreferredName(), jobId);
+                builder.field(SNAPSHOT_ID.getPreferredName(), snapshotId);
+                builder.field(STATE, upgradeState.toString());
+                if (node != null) {
+                    builder.startObject(NODE);
+                    builder.field("id", node.getId());
+                    builder.field("name", node.getName());
+                    builder.field("ephemeral_id", node.getEphemeralId());
+                    builder.field("transport_address", node.getAddress().toString());
+
+                    builder.startObject("attributes");
+                    for (Map.Entry<String, String> entry : node.getAttributes().entrySet()) {
+                        if (entry.getKey().startsWith("ml.")) {
+                            builder.field(entry.getKey(), entry.getValue());
+                        }
+                    }
+                    builder.endObject();
+                    builder.endObject();
+                }
+                if (assignmentExplanation != null) {
+                    builder.field(ASSIGNMENT_EXPLANATION, assignmentExplanation);
+                }
+                builder.endObject();
+                return builder;
+            }
+
+            @Override
+            public void writeTo(StreamOutput out) throws IOException {
+                out.writeString(jobId);
+                out.writeString(snapshotId);
+                upgradeState.writeTo(out);
+                out.writeOptionalWriteable(node);
+                out.writeOptionalString(assignmentExplanation);
+            }
+
+            @Override
+            public int hashCode() {
+                return Objects.hash(jobId, snapshotId, upgradeState, node, assignmentExplanation);
+            }
+
+            @Override
+            public boolean equals(Object obj) {
+                if (obj == null) {
+                    return false;
+                }
+                if (getClass() != obj.getClass()) {
+                    return false;
+                }
+                Response.JobModelSnapshotUpgradeStats other = (Response.JobModelSnapshotUpgradeStats) obj;
+                return Objects.equals(this.jobId, other.jobId)
+                    && Objects.equals(this.snapshotId, other.snapshotId)
+                    && Objects.equals(this.upgradeState, other.upgradeState)
+                    && Objects.equals(this.node, other.node)
+                    && Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
+            }
+
+            public static Response.JobModelSnapshotUpgradeStats.Builder builder(String jobId, String snapshotId) {
+                return new Response.JobModelSnapshotUpgradeStats.Builder(jobId, snapshotId);
+            }
+
+            public static class Builder {
+                private final String jobId;
+                private final String snapshotId;
+                private SnapshotUpgradeState upgradeState;
+                private DiscoveryNode node;
+                private String assignmentExplanation;
+
+                public Builder(String jobId, String snapshotId) {
+                    this.jobId = jobId;
+                    this.snapshotId = snapshotId;
+                }
+
+                public String getJobId() {
+                    return jobId;
+                }
+
+                public String getSnapshotId() {
+                    return snapshotId;
+                }
+
+                public Response.JobModelSnapshotUpgradeStats.Builder setUpgradeState(SnapshotUpgradeState upgradeState) {
+                    this.upgradeState = Objects.requireNonNull(upgradeState);
+                    return this;
+                }
+
+                public Response.JobModelSnapshotUpgradeStats.Builder setNode(DiscoveryNode node) {
+                    this.node = node;
+                    return this;
+                }
+
+                public Response.JobModelSnapshotUpgradeStats.Builder setAssignmentExplanation(String assignmentExplanation) {
+                    this.assignmentExplanation = assignmentExplanation;
+                    return this;
+                }
+
+                public Response.JobModelSnapshotUpgradeStats build() {
+                    return new Response.JobModelSnapshotUpgradeStats(jobId, snapshotId, upgradeState, node, assignmentExplanation);
+                }
+            }
+        }
+
+        public Response(QueryPage<Response.JobModelSnapshotUpgradeStats> upgradeStats) {
+            super(upgradeStats);
+        }
+
+        public Response(StreamInput in) throws IOException {
+            super(in);
+        }
+
+        public QueryPage<Response.JobModelSnapshotUpgradeStats> getResponse() {
+            return getResources();
+        }
+
+        @Override
+        protected Reader<Response.JobModelSnapshotUpgradeStats> getReader() {
+            return Response.JobModelSnapshotUpgradeStats::new;
+        }
+    }
+}

+ 146 - 102
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/action/util/ExpandedIdsMatcherTests.java

@@ -20,111 +20,155 @@ import static org.hamcrest.Matchers.oneOf;
 public class ExpandedIdsMatcherTests extends ESTestCase {
 
     public void testMatchingResourceIds() {
-        ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(new String[] { "*" }, false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertTrue(requiredMatches.hasUnmatchedIds());
-        requiredMatches.filterMatchedIds(Collections.singletonList("foo"));
-        assertFalse(requiredMatches.hasUnmatchedIds());
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(""), false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        requiredMatches.filterMatchedIds(Collections.singletonList("foo"));
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(null), false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        requiredMatches.filterMatchedIds(Collections.singletonList("foo"));
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(null), false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        requiredMatches.filterMatchedIds(Collections.emptyList());
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertThat(requiredMatches.unmatchedIds().get(0), equalTo("*"));
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression("_all"), false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        requiredMatches.filterMatchedIds(Collections.singletonList("foo"));
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo*" }, false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        requiredMatches.filterMatchedIds(Arrays.asList("foo1", "foo2"));
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(2));
-        requiredMatches.filterMatchedIds(Arrays.asList("foo1", "foo2"));
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertEquals("bar", requiredMatches.unmatchedIds().get(0));
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(2));
-        requiredMatches.filterMatchedIds(Arrays.asList("foo1", "bar"));
-        assertFalse(requiredMatches.hasUnmatchedIds());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(2));
-        requiredMatches.filterMatchedIds(Collections.singletonList("bar"));
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertEquals("foo*", requiredMatches.unmatchedIds().get(0));
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression("foo,bar,baz,wild*"), false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(4));
-        requiredMatches.filterMatchedIds(Arrays.asList("foo", "baz"));
-        assertThat(requiredMatches.unmatchedIds(), hasSize(2));
-        assertThat(requiredMatches.unmatchedIds().get(0), is(oneOf("bar", "wild*")));
-        assertThat(requiredMatches.unmatchedIds().get(1), is(oneOf("bar", "wild*")));
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo", "bar" }, false);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(2));
-        requiredMatches.filterMatchedIds(Collections.singletonList("bar"));
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertEquals("foo", requiredMatches.unmatchedIds().get(0));
-        assertTrue(requiredMatches.isOnlyExact());
+        ExpandedIdsMatcher matcher = new ExpandedIdsMatcher(new String[] { "*" }, false);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertTrue(matcher.hasUnmatchedIds());
+        matcher.filterMatchedIds(Collections.singletonList("foo"));
+        assertFalse(matcher.hasUnmatchedIds());
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher("", false);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        matcher.filterMatchedIds(Collections.singletonList("foo"));
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(null), false);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        matcher.filterMatchedIds(Collections.singletonList("foo"));
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher(ExpandedIdsMatcher.tokenizeExpression(null), false);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        matcher.filterMatchedIds(Collections.emptyList());
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertThat(matcher.unmatchedIds().get(0), equalTo("*"));
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher("_all", false);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        matcher.filterMatchedIds(Collections.singletonList("foo"));
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo*" }, false);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        matcher.filterMatchedIds(Arrays.asList("foo1", "foo2"));
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("foo1"));
+        assertFalse(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, false);
+        assertThat(matcher.unmatchedIds(), hasSize(2));
+        matcher.filterMatchedIds(Arrays.asList("foo1", "foo2"));
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertEquals("bar", matcher.unmatchedIds().get(0));
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
+        assertFalse(matcher.idMatches("bar1"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, false);
+        assertThat(matcher.unmatchedIds(), hasSize(2));
+        matcher.filterMatchedIds(Arrays.asList("foo1", "bar"));
+        assertFalse(matcher.hasUnmatchedIds());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
+        assertFalse(matcher.idMatches("bar1"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, false);
+        assertThat(matcher.unmatchedIds(), hasSize(2));
+        matcher.filterMatchedIds(Collections.singletonList("bar"));
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertEquals("foo*", matcher.unmatchedIds().get(0));
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
+        assertFalse(matcher.idMatches("bar1"));
+
+        matcher = new ExpandedIdsMatcher("foo,bar,baz,wild*", false);
+        assertThat(matcher.unmatchedIds(), hasSize(4));
+        matcher.filterMatchedIds(Arrays.asList("foo", "baz"));
+        assertThat(matcher.unmatchedIds(), hasSize(2));
+        assertThat(matcher.unmatchedIds().get(0), is(oneOf("bar", "wild*")));
+        assertThat(matcher.unmatchedIds().get(1), is(oneOf("bar", "wild*")));
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertFalse(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
+        assertTrue(matcher.idMatches("wild"));
+        assertTrue(matcher.idMatches("wild1"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo", "bar" }, false);
+        assertThat(matcher.unmatchedIds(), hasSize(2));
+        matcher.filterMatchedIds(Collections.singletonList("bar"));
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertEquals("foo", matcher.unmatchedIds().get(0));
+        assertTrue(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertFalse(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
     }
 
     public void testMatchingResourceIds_allowNoMatch() {
-        ExpandedIdsMatcher requiredMatches = new ExpandedIdsMatcher(new String[] { "*" }, true);
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.hasUnmatchedIds());
-        requiredMatches.filterMatchedIds(Collections.emptyList());
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.hasUnmatchedIds());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, true);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertTrue(requiredMatches.hasUnmatchedIds());
-        requiredMatches.filterMatchedIds(Collections.singletonList("bar"));
-        assertThat(requiredMatches.unmatchedIds(), empty());
-        assertFalse(requiredMatches.hasUnmatchedIds());
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, true);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        requiredMatches.filterMatchedIds(Collections.emptyList());
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertEquals("bar", requiredMatches.unmatchedIds().get(0));
-        assertFalse(requiredMatches.isOnlyExact());
-
-        requiredMatches = new ExpandedIdsMatcher(new String[] { "foo", "bar" }, true);
-        assertThat(requiredMatches.unmatchedIds(), hasSize(2));
-        requiredMatches.filterMatchedIds(Collections.singletonList("bar"));
-        assertThat(requiredMatches.unmatchedIds(), hasSize(1));
-        assertEquals("foo", requiredMatches.unmatchedIds().get(0));
-        assertTrue(requiredMatches.isOnlyExact());
+        ExpandedIdsMatcher matcher = new ExpandedIdsMatcher(new String[] { "*" }, true);
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.hasUnmatchedIds());
+        matcher.filterMatchedIds(Collections.emptyList());
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.hasUnmatchedIds());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, true);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertTrue(matcher.hasUnmatchedIds());
+        matcher.filterMatchedIds(Collections.singletonList("bar"));
+        assertThat(matcher.unmatchedIds(), empty());
+        assertFalse(matcher.hasUnmatchedIds());
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo*", "bar" }, true);
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        matcher.filterMatchedIds(Collections.emptyList());
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertEquals("bar", matcher.unmatchedIds().get(0));
+        assertFalse(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertTrue(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
+
+        matcher = new ExpandedIdsMatcher(new String[] { "foo", "bar" }, true);
+        assertThat(matcher.unmatchedIds(), hasSize(2));
+        matcher.filterMatchedIds(Collections.singletonList("bar"));
+        assertThat(matcher.unmatchedIds(), hasSize(1));
+        assertEquals("foo", matcher.unmatchedIds().get(0));
+        assertTrue(matcher.isOnlyExact());
+        assertTrue(matcher.idMatches("foo"));
+        assertFalse(matcher.idMatches("foo1"));
+        assertTrue(matcher.idMatches("bar"));
     }
 
     public void testSimpleMatcher() {

+ 27 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsActionRequestTests.java

@@ -0,0 +1,27 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction.Request;
+
+public class GetJobModelSnapshotsUpgradeStatsActionRequestTests extends AbstractWireSerializingTestCase<Request> {
+
+    @Override
+    protected Request createTestInstance() {
+        Request request = new Request(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20));
+        request.setAllowNoMatch(randomBoolean());
+        return request;
+    }
+
+    @Override
+    protected Writeable.Reader<Request> instanceReader() {
+        return Request::new;
+    }
+}

+ 54 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobModelSnapshotsUpgradeStatsActionResponseTests.java

@@ -0,0 +1,54 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+import org.elasticsearch.xpack.core.action.util.QueryPage;
+import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction.Response;
+import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeState;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GetJobModelSnapshotsUpgradeStatsActionResponseTests extends AbstractWireSerializingTestCase<Response> {
+
+    @Override
+    protected Response createTestInstance() {
+
+        int listSize = randomInt(10);
+        List<Response.JobModelSnapshotUpgradeStats> statsList = new ArrayList<>(listSize);
+        for (int j = 0; j < listSize; j++) {
+            statsList.add(createRandomizedStat());
+        }
+
+        return new Response(new QueryPage<>(statsList, statsList.size(), GetJobModelSnapshotsUpgradeStatsAction.RESULTS_FIELD));
+    }
+
+    @Override
+    protected Writeable.Reader<Response> instanceReader() {
+        return Response::new;
+    }
+
+    public static Response.JobModelSnapshotUpgradeStats createRandomizedStat() {
+        Response.JobModelSnapshotUpgradeStats.Builder builder = Response.JobModelSnapshotUpgradeStats.builder(
+            randomAlphaOfLengthBetween(1, 20),
+            randomAlphaOfLengthBetween(1, 20)
+        ).setUpgradeState(randomFrom(SnapshotUpgradeState.values()));
+        if (randomBoolean()) {
+            builder.setNode(new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT));
+        } else {
+            builder.setAssignmentExplanation(randomAlphaOfLengthBetween(20, 50));
+        }
+        return builder.build();
+    }
+}

+ 5 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -121,6 +121,7 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDeploymentStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
 import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction;
+import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetModelSnapshotsAction;
@@ -214,6 +215,7 @@ import org.elasticsearch.xpack.ml.action.TransportGetDatafeedsStatsAction;
 import org.elasticsearch.xpack.ml.action.TransportGetDeploymentStatsAction;
 import org.elasticsearch.xpack.ml.action.TransportGetFiltersAction;
 import org.elasticsearch.xpack.ml.action.TransportGetInfluencersAction;
+import org.elasticsearch.xpack.ml.action.TransportGetJobModelSnapshotsUpgradeStatsAction;
 import org.elasticsearch.xpack.ml.action.TransportGetJobsAction;
 import org.elasticsearch.xpack.ml.action.TransportGetJobsStatsAction;
 import org.elasticsearch.xpack.ml.action.TransportGetModelSnapshotsAction;
@@ -397,6 +399,7 @@ import org.elasticsearch.xpack.ml.rest.job.RestPostJobUpdateAction;
 import org.elasticsearch.xpack.ml.rest.job.RestPutJobAction;
 import org.elasticsearch.xpack.ml.rest.job.RestResetJobAction;
 import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestDeleteModelSnapshotAction;
+import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestGetJobModelSnapshotsUpgradeStatsAction;
 import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestGetModelSnapshotsAction;
 import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestRevertModelSnapshotAction;
 import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestUpdateModelSnapshotAction;
@@ -1187,6 +1190,7 @@ public class MachineLearning extends Plugin
             new RestGetTrainedModelsStatsAction(),
             new RestPutTrainedModelAction(),
             new RestUpgradeJobModelSnapshotAction(),
+            new RestGetJobModelSnapshotsUpgradeStatsAction(),
             new RestPutTrainedModelAliasAction(),
             new RestDeleteTrainedModelAliasAction(),
             new RestPreviewDataFrameAnalyticsAction(),
@@ -1277,6 +1281,7 @@ public class MachineLearning extends Plugin
             new ActionHandler<>(GetTrainedModelsStatsAction.INSTANCE, TransportGetTrainedModelsStatsAction.class),
             new ActionHandler<>(PutTrainedModelAction.INSTANCE, TransportPutTrainedModelAction.class),
             new ActionHandler<>(UpgradeJobModelSnapshotAction.INSTANCE, TransportUpgradeJobModelSnapshotAction.class),
+            new ActionHandler<>(GetJobModelSnapshotsUpgradeStatsAction.INSTANCE, TransportGetJobModelSnapshotsUpgradeStatsAction.class),
             new ActionHandler<>(PutTrainedModelAliasAction.INSTANCE, TransportPutTrainedModelAliasAction.class),
             new ActionHandler<>(DeleteTrainedModelAliasAction.INSTANCE, TransportDeleteTrainedModelAliasAction.class),
             new ActionHandler<>(PreviewDataFrameAnalyticsAction.INSTANCE, TransportPreviewDataFrameAnalyticsAction.class),

+ 130 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobModelSnapshotsUpgradeStatsAction.java

@@ -0,0 +1,130 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.action;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
+import org.elasticsearch.xpack.core.action.util.QueryPage;
+import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction;
+import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction.Request;
+import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction.Response;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.snapshot.upgrade.SnapshotUpgradeTaskParams;
+import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class TransportGetJobModelSnapshotsUpgradeStatsAction extends TransportMasterNodeReadAction<Request, Response> {
+
+    private static final Logger logger = LogManager.getLogger(TransportGetJobModelSnapshotsUpgradeStatsAction.class);
+
+    private final JobConfigProvider jobConfigProvider;
+
+    @Inject
+    public TransportGetJobModelSnapshotsUpgradeStatsAction(
+        TransportService transportService,
+        ClusterService clusterService,
+        ThreadPool threadPool,
+        ActionFilters actionFilters,
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        JobConfigProvider jobConfigProvider
+    ) {
+        super(
+            GetJobModelSnapshotsUpgradeStatsAction.NAME,
+            transportService,
+            clusterService,
+            threadPool,
+            actionFilters,
+            Request::new,
+            indexNameExpressionResolver,
+            Response::new,
+            ThreadPool.Names.SAME
+        );
+        this.jobConfigProvider = jobConfigProvider;
+    }
+
+    @Override
+    protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) {
+        logger.debug(
+            () -> new ParameterizedMessage("[{}] get stats for model snapshot [{}] upgrades", request.getJobId(), request.getSnapshotId())
+        );
+        final PersistentTasksCustomMetadata tasksInProgress = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
+        final Collection<PersistentTasksCustomMetadata.PersistentTask<?>> snapshotUpgrades = MlTasks.snapshotUpgradeTasks(tasksInProgress);
+
+        // 2. Now that we have the job IDs, find the relevant model snapshot upgrades
+        ActionListener<List<Job.Builder>> expandIdsListener = ActionListener.wrap(jobs -> {
+            ExpandedIdsMatcher requiredSnapshotIdMatches = new ExpandedIdsMatcher(request.getSnapshotId(), request.allowNoMatch());
+            Set<String> jobIds = jobs.stream().map(Job.Builder::getId).collect(Collectors.toSet());
+            List<Response.JobModelSnapshotUpgradeStats> statsList = snapshotUpgrades.stream()
+                .filter(t -> jobIds.contains(((SnapshotUpgradeTaskParams) t.getParams()).getJobId()))
+                .filter(t -> requiredSnapshotIdMatches.idMatches(((SnapshotUpgradeTaskParams) t.getParams()).getSnapshotId()))
+                .map(t -> {
+                    SnapshotUpgradeTaskParams params = (SnapshotUpgradeTaskParams) t.getParams();
+                    Response.JobModelSnapshotUpgradeStats.Builder statsBuilder = Response.JobModelSnapshotUpgradeStats.builder(
+                        params.getJobId(),
+                        params.getSnapshotId()
+                    );
+                    if (t.getExecutorNode() != null) {
+                        statsBuilder.setNode(state.getNodes().get(t.getExecutorNode()));
+                    }
+                    return statsBuilder.setUpgradeState(MlTasks.getSnapshotUpgradeState(t))
+                        .setAssignmentExplanation(t.getAssignment().getExplanation())
+                        .build();
+                })
+                .sorted(
+                    Comparator.comparing(Response.JobModelSnapshotUpgradeStats::getJobId)
+                        .thenComparing(Response.JobModelSnapshotUpgradeStats::getSnapshotId)
+                )
+                .collect(Collectors.toList());
+            requiredSnapshotIdMatches.filterMatchedIds(
+                statsList.stream().map(Response.JobModelSnapshotUpgradeStats::getSnapshotId).collect(Collectors.toList())
+            );
+            if (requiredSnapshotIdMatches.hasUnmatchedIds()) {
+                listener.onFailure(
+                    new ResourceNotFoundException(
+                        "no snapshot upgrade is running for snapshot_id [{}]",
+                        requiredSnapshotIdMatches.unmatchedIdsString()
+                    )
+                );
+            } else {
+                listener.onResponse(
+                    new Response(new QueryPage<>(statsList, statsList.size(), GetJobModelSnapshotsUpgradeStatsAction.RESULTS_FIELD))
+                );
+            }
+        }, listener::onFailure);
+
+        // 1. Expand jobs - this will throw if a required job ID match isn't made
+        jobConfigProvider.expandJobs(request.getJobId(), request.allowNoMatch(), true, expandIdsListener);
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
+    }
+}

+ 49 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/modelsnapshots/RestGetJobModelSnapshotsUpgradeStatsAction.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.rest.modelsnapshots;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.ml.action.GetJobModelSnapshotsUpgradeStatsAction;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+import static org.elasticsearch.xpack.core.ml.action.UpgradeJobModelSnapshotAction.Request.SNAPSHOT_ID;
+import static org.elasticsearch.xpack.ml.MachineLearning.BASE_PATH;
+
+public class RestGetJobModelSnapshotsUpgradeStatsAction extends BaseRestHandler {
+
+    @Override
+    public List<RestHandler.Route> routes() {
+        return List.of(
+            new Route(GET, BASE_PATH + "anomaly_detectors/{" + Job.ID + "}/model_snapshots/{" + SNAPSHOT_ID + "}/_upgrade/_stats")
+        );
+    }
+
+    @Override
+    public String getName() {
+        return "ml_get_job_model_snapshot_upgrade_stats_action";
+    }
+
+    @Override
+    protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+        String jobId = restRequest.param(Job.ID.getPreferredName());
+        String snapshotId = restRequest.param(SNAPSHOT_ID.getPreferredName());
+        GetJobModelSnapshotsUpgradeStatsAction.Request request = new GetJobModelSnapshotsUpgradeStatsAction.Request(jobId, snapshotId);
+        request.setAllowNoMatch(
+            restRequest.paramAsBoolean(GetJobModelSnapshotsUpgradeStatsAction.Request.ALLOW_NO_MATCH, request.allowNoMatch())
+        );
+        return channel -> client.execute(GetJobModelSnapshotsUpgradeStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
+    }
+}

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -297,6 +297,7 @@ public class Constants {
         "cluster:monitor/xpack/ml/info/get",
         "cluster:monitor/xpack/ml/job/get",
         "cluster:monitor/xpack/ml/job/model_snapshots/get",
+        "cluster:monitor/xpack/ml/job/model_snapshots/upgrade/stats/get",
         "cluster:monitor/xpack/ml/job/results/buckets/get",
         "cluster:monitor/xpack/ml/job/results/categories/get",
         "cluster:monitor/xpack/ml/job/results/influencers/get",

+ 80 - 1
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/upgrade_job_snapshot.yml

@@ -17,6 +17,48 @@ setup:
             }
           }
 
+  # It's too hard to create a genuine model snapshot in a YAML test.
+  # All we can do is create the descriptor doc that will allow the
+  # endpoints to get past an existence check. Then the actual snapshot
+  # upgrade will of course fail, but we can test that the stats report
+  # the attempt.
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+        Content-Type: application/json
+      index:
+        index: .ml-anomalies-shared
+        id:    upgrade-model-snapshot_model_snapshot_1234567890
+        body:  >
+          {
+            "job_id": "upgrade-model-snapshot",
+            "min_version": "7.15.1",
+            "timestamp": "2021-12-13T03:04:05Z",
+            "description": "just for this test",
+            "snapshot_id": "1234567890",
+            "snapshot_doc_count": 1,
+            "latest_record_timestamp": "2021-12-13T02:03:04Z",
+            "latest_result_timestamp": "2021-12-13T01:02:03Z",
+            "retain":true
+          }
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+        Content-Type: application/json
+      index:
+        index: .ml-state-000001
+        id:    "upgrade-model-snapshot_model_state_1234567890#1"
+        body:  >
+          {
+          }
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      indices.refresh:
+        index: [.ml-anomalies-shared,.ml-state-000001]
+
 ---
 "Test with unknown job id":
   - do:
@@ -24,10 +66,47 @@ setup:
       ml.upgrade_job_snapshot:
         job_id: "non-existent-job"
         snapshot_id: "san"
+
 ---
 "Test with unknown snapshot id":
   - do:
       catch: missing
       ml.upgrade_job_snapshot:
         job_id: "upgrade-model-snapshot"
-        snapshot_id: "snapshot-9999"
+        snapshot_id: "9999999999"
+
+---
+"Test stats all snapshots":
+  - do:
+      ml.get_model_snapshot_upgrade_stats:
+        job_id: "upgrade-model-snapshot"
+        snapshot_id: "_all"
+  - match: { count: 0 }
+
+---
+"Test stats one snapshot":
+  - do:
+      catch: missing
+      ml.get_model_snapshot_upgrade_stats:
+        job_id: "upgrade-model-snapshot"
+        snapshot_id: "9999999999"
+
+---
+"Test existing but corrupt snapshot":
+  - skip:
+      version: all
+      reason: "@AwaitsFix https://github.com/elastic/elasticsearch/issues/81578"
+
+  - do:
+      ml.upgrade_job_snapshot:
+        job_id: "upgrade-model-snapshot"
+        snapshot_id: "1234567890"
+        wait_for_completion: false
+
+  - do:
+      ml.get_model_snapshot_upgrade_stats:
+        job_id: "upgrade-model-snapshot"
+        snapshot_id: "1234567890"
+  - match: { count: 1 }
+  - match: { model_snapshot_upgrades.0.job_id: "upgrade-model-snapshot" }
+  - match: { model_snapshot_upgrades.0.snapshot_id: "1234567890" }

+ 24 - 8
x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlJobSnapshotUpgradeIT.java

@@ -11,6 +11,8 @@ import org.elasticsearch.Version;
 import org.elasticsearch.client.MachineLearningClient;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseException;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.ml.CloseJobRequest;
@@ -162,15 +164,29 @@ public class MlJobSnapshotUpgradeIT extends AbstractUpgradeTestCase {
             .findFirst()
             .orElseThrow(() -> new ElasticsearchException("Not found snapshot other than " + currentSnapshot));
 
+        // Don't wait for completion in the initial upgrade call, but instead poll for status
+        // using the stats endpoint - this mimics what the Kibana upgrade assistant does
+        String snapshotToUpgrade = snapshot.getSnapshotId();
         assertThat(
-            hlrc.upgradeJobSnapshot(
-                new UpgradeJobModelSnapshotRequest(JOB_ID, snapshot.getSnapshotId(), null, true),
-                RequestOptions.DEFAULT
-            ).isCompleted(),
-            is(true)
+            hlrc.upgradeJobSnapshot(new UpgradeJobModelSnapshotRequest(JOB_ID, snapshotToUpgrade, null, false), RequestOptions.DEFAULT)
+                .isCompleted(),
+            is(false)
         );
 
-        List<ModelSnapshot> snapshots = getModelSnapshots(job.getId(), snapshot.getSnapshotId()).snapshots();
+        // Wait for completion by waiting for the persistent task to disappear
+        assertBusy(() -> {
+            try {
+                Response response = client().performRequest(
+                    new Request("GET", "_ml/anomaly_detectors/" + JOB_ID + "/model_snapshots/" + snapshotToUpgrade + "/_upgrade/_stats")
+                );
+                // Doing this instead of using expectThrows() on the line above means we get better diagnostics if the test fails
+                fail("Upgrade still in progress: " + entityAsMap(response));
+            } catch (ResponseException e) {
+                assertThat(e.getResponse().toString(), e.getResponse().getStatusLine().getStatusCode(), is(404));
+            }
+        }, 30, TimeUnit.SECONDS);
+
+        List<ModelSnapshot> snapshots = getModelSnapshots(job.getId(), snapshotToUpgrade).snapshots();
         assertThat(snapshots, hasSize(1));
         snapshot = snapshots.get(0);
         assertThat(snapshot.getLatestRecordTimeStamp(), equalTo(snapshots.get(0).getLatestRecordTimeStamp()));
@@ -184,11 +200,11 @@ public class MlJobSnapshotUpgradeIT extends AbstractUpgradeTestCase {
                 .getLatestRecordTimeStamp(),
             greaterThan(snapshot.getLatestRecordTimeStamp())
         );
-        RevertModelSnapshotRequest revertModelSnapshotRequest = new RevertModelSnapshotRequest(JOB_ID, snapshot.getSnapshotId());
+        RevertModelSnapshotRequest revertModelSnapshotRequest = new RevertModelSnapshotRequest(JOB_ID, snapshotToUpgrade);
         revertModelSnapshotRequest.setDeleteInterveningResults(true);
         assertThat(
             hlrc.revertModelSnapshot(revertModelSnapshotRequest, RequestOptions.DEFAULT).getModel().getSnapshotId(),
-            equalTo(snapshot.getSnapshotId())
+            equalTo(snapshotToUpgrade)
         );
         assertThat(openJob(JOB_ID).isOpened(), is(true));
         assertThat(