Browse Source

[ML] Reset anomaly detection job API (#73908)

Adds a new API that allows a user to reset
an anomaly detection job.

To use the API do:

```
POST _ml/anomaly_detectors/<job_id>_reset
```

The API removes all data associated to the job.
In particular, it deletes model state, results and stats.

However, job notifications and user annotations are not removed.

Also, the API can be called asynchronously by setting the parameter
`wait_for_completion` to `false` (defaults to `true`). When run
that way the API returns the task id for further monitoring.

In order to prevent the job from opening while it is resetting,
a new job field has been added called `blocked`. It is an object
that contains a `reason` and the `task_id`. `reason` can take
a value from ["delete", "reset", "revert"] as all these
operations should block the job from opening. The `task_id` is also
included in order to allow tracking the task if necessary.

Finally, this commit also sets the `blocked` field when
the revert snapshot API is called as a job should not be opened
while it is reverted to a different model snapshot.
Dimitris Athanasiou 4 years ago
parent
commit
dc61a72c9e
37 changed files with 1344 additions and 171 deletions
  1. 18 0
      docs/reference/ml/anomaly-detection/apis/get-job.asciidoc
  2. 2 0
      docs/reference/ml/anomaly-detection/apis/index.asciidoc
  3. 1 0
      docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc
  4. 86 0
      docs/reference/ml/anomaly-detection/apis/reset-job.asciidoc
  5. 36 0
      rest-api-spec/src/main/resources/rest-api-spec/api/ml.reset_job.json
  6. 118 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetJobAction.java
  7. 111 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Blocked.java
  8. 53 8
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java
  9. 37 8
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java
  10. 2 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java
  11. 10 0
      x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json
  12. 27 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ResetJobRequestTests.java
  13. 58 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/BlockedTests.java
  14. 40 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java
  15. 3 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java
  16. 2 0
      x-pack/plugin/ml/qa/ml-with-security/build.gradle
  17. 7 3
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java
  18. 6 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java
  19. 1 28
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java
  20. 36 6
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java
  21. 84 0
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java
  22. 1 1
      x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java
  23. 16 10
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java
  24. 1 1
      x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java
  25. 5 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
  26. 46 7
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java
  27. 212 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java
  28. 98 25
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java
  29. 6 1
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java
  30. 43 51
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java
  31. 3 3
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java
  32. 33 13
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java
  33. 76 0
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestResetJobAction.java
  34. 8 5
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java
  35. 10 1
      x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java
  36. 1 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java
  37. 47 0
      x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/reset_job.yml

+ 18 - 0
docs/reference/ml/anomaly-detection/apis/get-job.asciidoc

@@ -59,6 +59,24 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=exclude-generated]
 The API returns an array of {anomaly-job} resources. For the full list of
 properties, see <<ml-put-job-request-body,create {anomaly-jobs} API>>.
 
+//Begin blocked
+`blocked`::
+(object) When present, it explains that a task is executed on the job
+that blocks it from opening.
++
+.Properties of `blocked`
+[%collapsible%open]
+====
+`reason`:::
+(string) The reason the job is blocked. Values may be `delete`, `reset`, `revert`.
+Each value means the corresponding action is being executed.
+
+`task_id`:::
+(string) The task id of the blocking action. You can use the <<tasks>> API to
+monitor progress.
+====
+//End blocked
+
 `create_time`::
 (string) The time the job was created. For example, `1491007356077`. This
 property is informational; you cannot change its value.

+ 2 - 0
docs/reference/ml/anomaly-detection/apis/index.asciidoc

@@ -46,6 +46,8 @@ include::open-job.asciidoc[leveloffset=+2]
 include::post-data.asciidoc[leveloffset=+2]
 //PREVIEW
 include::preview-datafeed.asciidoc[leveloffset=+2]
+//RESET
+include::reset-job.asciidoc[leveloffset=+2]
 //REVERT
 include::revert-snapshot.asciidoc[leveloffset=+2]
 //SET/START/STOP

+ 1 - 0
docs/reference/ml/anomaly-detection/apis/ml-apis.asciidoc

@@ -19,6 +19,7 @@ See also <<ml-df-analytics-apis>>.
 * <<ml-post-data,Post data to {anomaly-jobs}>>
 * <<ml-update-job,Update {anomaly-jobs}>>
 * <<ml-forecast,Create>> or <<ml-delete-forecast,delete forecasts>>
+* <<ml-reset-job,Reset {anomaly-jobs}>> 
 
 
 [discrete]

+ 86 - 0
docs/reference/ml/anomaly-detection/apis/reset-job.asciidoc

@@ -0,0 +1,86 @@
+[role="xpack"]
+[testenv="platinum"]
+[[ml-reset-job]]
+= Reset {anomaly-jobs} API
+++++
+<titleabbrev>Reset jobs</titleabbrev>
+++++
+
+Resets an existing {anomaly-job}.
+
+[[ml-reset-job-request]]
+== {api-request-title}
+
+`POST _ml/anomaly_detectors/<job_id>/_reset`
+
+[[ml-reset-job-prereqs]]
+== {api-prereq-title}
+
+* Requires the `manage_ml` cluster privilege. This privilege is included in the 
+`machine_learning_admin` built-in role.
+* Before you can reset a job, you must close it. You can set `force` to `true`
+when closing the job to avoid waiting for the job to complete. See 
+<<ml-close-job>>.
+
+[[ml-reset-job-desc]]
+== {api-description-title}
+
+All model state and results are deleted.
+The job is ready to start over as if it had just been created.
+
+It is not currently possible to reset multiple jobs using wildcards or a comma
+separated list.
+
+[[ml-reset-job-path-parms]]
+== {api-path-parms-title}
+
+`<job_id>`::
+(Required, string)
+include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-anomaly-detection]
+
+[[ml-reset-job-query-parms]]
+== {api-query-parms-title}
+
+`wait_for_completion`::
+  (Optional, Boolean) Specifies whether the request should return immediately or
+  wait until the job reset completes. Defaults to `true`.
+
+[[ml-reset-job-example]]
+== {api-examples-title}
+
+[source,console]
+--------------------------------------------------
+POST _ml/anomaly_detectors/total-requests/_reset
+--------------------------------------------------
+// TEST[skip:setup:server_metrics_job]
+
+When the job is reset, you receive the following results:
+
+[source,console-result]
+----
+{
+  "acknowledged": true
+}
+----
+
+In the next example we reset the `total-requests` job asynchronously:
+
+[source,console]
+--------------------------------------------------
+POST _ml/anomaly_detectors/total-requests/_reset?wait_for_completion=false
+--------------------------------------------------
+// TEST[skip:setup:server_metrics_job]
+
+When `wait_for_completion` is set to `false`, the response contains the id
+of the job reset task:
+
+[source,console-result]
+----
+{
+  "task": "oTUltX4IQMOUUVeiohTt8A:39"
+}
+----
+// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/]
+
+If you want to check the status of the reset task, use the <<tasks>> by referencing 
+the task ID.

+ 36 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/ml.reset_job.json

@@ -0,0 +1,36 @@
+{
+  "ml.reset_job":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-reset-job.html",
+      "description":"Resets an existing anomaly detection job."
+    },
+    "stability":"stable",
+    "visibility":"public",
+    "headers":{
+      "accept": [ "application/json"]
+    },
+    "url":{
+      "paths":[
+        {
+          "path":"/_ml/anomaly_detectors/{job_id}/_reset",
+          "methods":[
+            "POST"
+          ],
+          "parts":{
+            "job_id":{
+              "type":"string",
+              "description":"The ID of the job to reset"
+            }
+          }
+        }
+      ]
+    },
+    "params":{
+      "wait_for_completion":{
+        "type":"boolean",
+        "description":"Should this request wait until the operation has completed before returning",
+        "default":true
+      }
+    }
+  }
+}

+ 118 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetJobAction.java

@@ -0,0 +1,118 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+public class ResetJobAction extends ActionType<AcknowledgedResponse> {
+
+    public static final String NAME = "cluster:admin/xpack/ml/job/reset";
+    public static final ResetJobAction INSTANCE = new ResetJobAction();
+
+    public static final Version VERSION_INTRODUCED = Version.V_7_14_0;
+
+    private ResetJobAction() {
+        super(NAME, AcknowledgedResponse::readFrom);
+    }
+
+    public static class Request extends AcknowledgedRequest<Request> {
+
+        private String jobId;
+
+        /**
+         * Internal parameter that allows resetting an open job
+         * when a job is reallocated to a new node.
+         */
+        private boolean skipJobStateValidation;
+
+        /**
+         * Should this task store its result?
+         */
+        private boolean shouldStoreResult;
+
+        public Request(String jobId) {
+            this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID);
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            jobId = in.readString();
+            skipJobStateValidation = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeString(jobId);
+            out.writeBoolean(skipJobStateValidation);
+        }
+
+        public void setSkipJobStateValidation(boolean skipJobStateValidation) {
+            this.skipJobStateValidation = skipJobStateValidation;
+        }
+
+        public boolean isSkipJobStateValidation() {
+            return skipJobStateValidation;
+        }
+
+        /**
+         * Should this task store its result after it has finished?
+         */
+        public void setShouldStoreResult(boolean shouldStoreResult) {
+            this.shouldStoreResult = shouldStoreResult;
+        }
+
+        @Override
+        public boolean getShouldStoreResult() {
+            return shouldStoreResult;
+        }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, MlTasks.JOB_TASK_ID_PREFIX + jobId, parentTaskId, headers);
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        public String getJobId() {
+            return jobId;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(jobId, skipJobStateValidation);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || o.getClass() != getClass()) return false;
+            Request that = (Request) o;
+            return Objects.equals(jobId, that.jobId) && skipJobStateValidation == that.skipJobStateValidation;
+        }
+    }
+}

+ 111 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Blocked.java

@@ -0,0 +1,111 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.job.config;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ParseField;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.core.Nullable;
+import org.elasticsearch.tasks.TaskId;
+
+import java.io.IOException;
+import java.util.Locale;
+import java.util.Objects;
+
+public class Blocked implements ToXContentObject, Writeable {
+
+    public enum Reason {
+        NONE, DELETE, RESET, REVERT;
+
+        public static Reason fromString(String value) {
+            return Reason.valueOf(value.toUpperCase(Locale.ROOT));
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    public static final ParseField REASON = new ParseField("reason");
+    public static final ParseField TASK_ID = new ParseField("task_id");
+
+    public static final ConstructingObjectParser<Blocked, Void> LENIENT_PARSER = createParser(true);
+    public static final ConstructingObjectParser<Blocked, Void> STRICT_PARSER = createParser(false);
+
+    private static ConstructingObjectParser<Blocked, Void> createParser(boolean ignoreUnknownFields) {
+        ConstructingObjectParser<Blocked, Void> parser = new ConstructingObjectParser<>("blocked", ignoreUnknownFields,
+            a -> new Blocked((Reason) a[0], (TaskId) a[1]));
+        parser.declareString(ConstructingObjectParser.constructorArg(), Reason::fromString, REASON);
+        parser.declareString(ConstructingObjectParser.optionalConstructorArg(), TaskId::new, TASK_ID);
+        return parser;
+    }
+
+    private final Reason reason;
+
+    @Nullable
+    private final TaskId taskId;
+
+    public Blocked(Reason reason, @Nullable TaskId taskId) {
+        this.reason = Objects.requireNonNull(reason);
+        this.taskId = taskId;
+    }
+
+    public Blocked(StreamInput in) throws IOException {
+        this.reason = in.readEnum(Reason.class);
+        this.taskId = in.readOptionalWriteable(TaskId::readFromStream);
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeEnum(reason);
+        out.writeOptionalWriteable(taskId);
+    }
+
+    public Reason getReason() {
+        return reason;
+    }
+
+    @Nullable
+    public TaskId getTaskId() {
+        return taskId;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(REASON.getPreferredName(), reason);
+        if (taskId != null) {
+            builder.field(TASK_ID.getPreferredName(), taskId.toString());
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(reason, taskId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Blocked that = (Blocked) o;
+        return Objects.equals(reason, that.reason) && Objects.equals(taskId, that.taskId);
+    }
+
+    public static Blocked none() {
+        return new Blocked(Reason.NONE, null);
+    }
+}

+ 53 - 8
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java

@@ -83,6 +83,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
     public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
     public static final ParseField DELETING = new ParseField("deleting");
     public static final ParseField ALLOW_LAZY_OPEN = new ParseField("allow_lazy_open");
+    public static final ParseField BLOCKED = new ParseField("blocked");
 
     // Used for QueryPage
     public static final ParseField RESULTS_FIELD = new ParseField("jobs");
@@ -136,6 +137,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
         parser.declareBoolean(Builder::setDeleting, DELETING);
         parser.declareBoolean(Builder::setAllowLazyOpen, ALLOW_LAZY_OPEN);
+        parser.declareObject(Builder::setBlocked, ignoreUnknownFields ? Blocked.LENIENT_PARSER : Blocked.STRICT_PARSER, BLOCKED);
 
         return parser;
     }
@@ -170,6 +172,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
     private final String resultsIndexName;
     private final boolean deleting;
     private final boolean allowLazyOpen;
+    private final Blocked blocked;
 
     private Job(String jobId, String jobType, Version jobVersion, List<String> groups, String description,
                 Date createTime, Date finishedTime,
@@ -177,7 +180,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
                 ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
                 Long modelSnapshotRetentionDays, Long dailyModelSnapshotRetentionAfterDays, Long resultsRetentionDays,
                 Map<String, Object> customSettings, String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName,
-                boolean deleting, boolean allowLazyOpen) {
+                boolean deleting, boolean allowLazyOpen, Blocked blocked) {
 
         this.jobId = jobId;
         this.jobType = jobType;
@@ -199,8 +202,19 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         this.modelSnapshotId = modelSnapshotId;
         this.modelSnapshotMinVersion = modelSnapshotMinVersion;
         this.resultsIndexName = resultsIndexName;
-        this.deleting = deleting;
         this.allowLazyOpen = allowLazyOpen;
+
+        if (deleting == false && blocked.getReason() == Blocked.Reason.DELETE) {
+            this.deleting = true;
+        } else {
+            this.deleting = deleting;
+        }
+
+        if (deleting && blocked.getReason() != Blocked.Reason.DELETE) {
+            this.blocked = new Blocked(Blocked.Reason.DELETE, null);
+        } else {
+            this.blocked = blocked;
+        }
     }
 
     public Job(StreamInput in) throws IOException {
@@ -231,6 +245,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         resultsIndexName = in.readString();
         deleting = in.readBoolean();
         allowLazyOpen = in.readBoolean();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            blocked = new Blocked(in);
+        } else {
+            blocked = deleting ? new Blocked(Blocked.Reason.DELETE, null) : Blocked.none();
+        }
     }
 
     /**
@@ -416,6 +435,10 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         return allowLazyOpen;
     }
 
+    public Blocked getBlocked() {
+        return blocked;
+    }
+
     /**
      * Get all input data fields mentioned in the job configuration,
      * namely analysis fields and the time field.
@@ -503,6 +526,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         out.writeString(resultsIndexName);
         out.writeBoolean(deleting);
         out.writeBoolean(allowLazyOpen);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            blocked.writeTo(out);
+        }
     }
 
     @Override
@@ -578,6 +604,9 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         }
         builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
         builder.field(ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
+        if (blocked.getReason() != Blocked.Reason.NONE) {
+            builder.field(BLOCKED.getPreferredName(), blocked);
+        }
         return builder;
     }
 
@@ -613,7 +642,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
                 && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
                 && Objects.equals(this.resultsIndexName, that.resultsIndexName)
                 && Objects.equals(this.deleting, that.deleting)
-                && Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
+                && Objects.equals(this.allowLazyOpen, that.allowLazyOpen)
+                && Objects.equals(this.blocked, that.blocked);
     }
 
     @Override
@@ -621,7 +651,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime,
                 analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
                 backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
-                customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen);
+                customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen, blocked);
     }
 
     // Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString()
@@ -671,6 +701,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
         private String resultsIndexName;
         private boolean deleting;
         private boolean allowLazyOpen;
+        private Blocked blocked = Blocked.none();
 
         public Builder() {
         }
@@ -702,6 +733,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
             this.resultsIndexName = job.getResultsIndexNameNoPrefix();
             this.deleting = job.isDeleting();
             this.allowLazyOpen = job.allowLazyOpen();
+            this.blocked = job.getBlocked();
         }
 
         public Builder(StreamInput in) throws IOException {
@@ -731,6 +763,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
             resultsIndexName = in.readOptionalString();
             deleting = in.readBoolean();
             allowLazyOpen = in.readBoolean();
+            if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+                blocked = new Blocked(in);
+            } else {
+                blocked = Blocked.none();
+            }
         }
 
         public Builder setId(String id) {
@@ -865,6 +902,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
             return this;
         }
 
+        public Builder setBlocked(Blocked blocked) {
+            this.blocked = ExceptionsHelper.requireNonNull(blocked, BLOCKED);
+            return this;
+        }
+
         /**
          * Return the list of fields that have been set and are invalid to
          * be set when the job is created e.g. model snapshot Id should not
@@ -930,9 +972,11 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
             out.writeOptionalString(resultsIndexName);
             out.writeBoolean(deleting);
             out.writeBoolean(allowLazyOpen);
+            if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+                blocked.writeTo(out);
+            }
         }
 
-        @Override
         public boolean equals(Object o) {
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
@@ -959,7 +1003,8 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
                     && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
                     && Objects.equals(this.resultsIndexName, that.resultsIndexName)
                     && Objects.equals(this.deleting, that.deleting)
-                    && Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
+                    && Objects.equals(this.allowLazyOpen, that.allowLazyOpen)
+                    && Objects.equals(this.blocked, that.blocked);
         }
 
         @Override
@@ -967,7 +1012,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
             return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription,
                     createTime, finishedTime, modelPlotConfig, renormalizationWindowDays,
                     backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
-                    customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen);
+                    customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen, blocked);
         }
 
         /**
@@ -1126,7 +1171,7 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
                     id, jobType, jobVersion, groups, description, createTime, finishedTime,
                     analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
                     backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
-                    customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen);
+                    customSettings, modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting, allowLazyOpen, blocked);
         }
 
         private void checkValidBackgroundPersistInterval() {

+ 37 - 8
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdate.java

@@ -66,6 +66,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
         INTERNAL_PARSER.declareString(Builder::setModelSnapshotMinVersion, Job.MODEL_SNAPSHOT_MIN_VERSION);
         INTERNAL_PARSER.declareString(Builder::setJobVersion, Job.JOB_VERSION);
         INTERNAL_PARSER.declareBoolean(Builder::setClearFinishTime, CLEAR_JOB_FINISH_TIME);
+        INTERNAL_PARSER.declareObject(Builder::setBlocked, Blocked.STRICT_PARSER, Job.BLOCKED);
     }
 
     private final String jobId;
@@ -87,6 +88,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
     private final Version jobVersion;
     private final Boolean clearJobFinishTime;
     private final Boolean allowLazyOpen;
+    private final Blocked blocked;
 
     private JobUpdate(String jobId, @Nullable List<String> groups, @Nullable String description,
                       @Nullable List<DetectorUpdate> detectorUpdates, @Nullable ModelPlotConfig modelPlotConfig,
@@ -97,7 +99,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
                       @Nullable PerPartitionCategorizationConfig perPartitionCategorizationConfig,
                       @Nullable Map<String, Object> customSettings, @Nullable String modelSnapshotId,
                       @Nullable Version modelSnapshotMinVersion, @Nullable Version jobVersion, @Nullable Boolean clearJobFinishTime,
-                      @Nullable Boolean allowLazyOpen) {
+                      @Nullable Boolean allowLazyOpen, @Nullable Blocked blocked) {
         this.jobId = jobId;
         this.groups = groups;
         this.description = description;
@@ -117,6 +119,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
         this.jobVersion = jobVersion;
         this.clearJobFinishTime = clearJobFinishTime;
         this.allowLazyOpen = allowLazyOpen;
+        this.blocked = blocked;
     }
 
     public JobUpdate(StreamInput in) throws IOException {
@@ -156,6 +159,11 @@ public class JobUpdate implements Writeable, ToXContentObject {
             modelSnapshotMinVersion = null;
         }
         allowLazyOpen = in.readOptionalBoolean();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            blocked = in.readOptionalWriteable(Blocked::new);
+        } else {
+            blocked = null;
+        }
     }
 
     @Override
@@ -196,6 +204,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
             out.writeBoolean(false);
         }
         out.writeOptionalBoolean(allowLazyOpen);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalWriteable(blocked);
+        }
     }
 
     public String getJobId() {
@@ -278,6 +289,10 @@ public class JobUpdate implements Writeable, ToXContentObject {
         return modelPlotConfig != null || perPartitionCategorizationConfig != null || detectorUpdates != null || groups != null;
     }
 
+    public Blocked getBlocked() {
+        return blocked;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
@@ -336,6 +351,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
         if (allowLazyOpen != null) {
             builder.field(Job.ALLOW_LAZY_OPEN.getPreferredName(), allowLazyOpen);
         }
+        if (blocked != null) {
+            builder.field(Job.BLOCKED.getPreferredName(), blocked);
+        }
         builder.endObject();
         return builder;
     }
@@ -485,6 +503,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
         if (allowLazyOpen != null) {
             builder.setAllowLazyOpen(allowLazyOpen);
         }
+        if (blocked != null) {
+            builder.setBlocked(blocked);
+        }
 
         builder.setAnalysisConfig(newAnalysisConfig);
         return builder.build();
@@ -511,7 +532,8 @@ public class JobUpdate implements Writeable, ToXContentObject {
                 && (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion()))
                 && (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()))
                 && (clearJobFinishTime == null || clearJobFinishTime == false || job.getFinishedTime() == null)
-                && (allowLazyOpen == null || Objects.equals(allowLazyOpen, job.allowLazyOpen()));
+                && (allowLazyOpen == null || Objects.equals(allowLazyOpen, job.allowLazyOpen()))
+                && (blocked == null || Objects.equals(blocked, job.getBlocked()));
     }
 
     boolean updatesDetectors(Job job) {
@@ -562,7 +584,8 @@ public class JobUpdate implements Writeable, ToXContentObject {
                 && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion)
                 && Objects.equals(this.jobVersion, that.jobVersion)
                 && Objects.equals(this.clearJobFinishTime, that.clearJobFinishTime)
-                && Objects.equals(this.allowLazyOpen, that.allowLazyOpen);
+                && Objects.equals(this.allowLazyOpen, that.allowLazyOpen)
+                && Objects.equals(this.blocked, that.blocked);
     }
 
     @Override
@@ -570,7 +593,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
         return Objects.hash(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, renormalizationWindowDays,
                 backgroundPersistInterval, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays, resultsRetentionDays,
                 categorizationFilters, perPartitionCategorizationConfig, customSettings, modelSnapshotId, modelSnapshotMinVersion,
-                jobVersion, clearJobFinishTime, allowLazyOpen);
+                jobVersion, clearJobFinishTime, allowLazyOpen, blocked);
     }
 
     public static class DetectorUpdate implements Writeable, ToXContentObject {
@@ -586,9 +609,9 @@ public class JobUpdate implements Writeable, ToXContentObject {
                     DetectionRule.STRICT_PARSER.apply(parser, parseFieldMatcher).build(), Detector.CUSTOM_RULES_FIELD);
         }
 
-        private int detectorIndex;
-        private String description;
-        private List<DetectionRule> rules;
+        private final int detectorIndex;
+        private final String description;
+        private final List<DetectionRule> rules;
 
         public DetectorUpdate(int detectorIndex, String description, List<DetectionRule> rules) {
             this.detectorIndex = detectorIndex;
@@ -685,6 +708,7 @@ public class JobUpdate implements Writeable, ToXContentObject {
         private Version jobVersion;
         private Boolean clearJobFinishTime;
         private Boolean allowLazyOpen;
+        private Blocked blocked;
 
         public Builder(String jobId) {
             this.jobId = jobId;
@@ -795,11 +819,16 @@ public class JobUpdate implements Writeable, ToXContentObject {
             return this;
         }
 
+        public Builder setBlocked(Blocked blocked) {
+            this.blocked = blocked;
+            return this;
+        }
+
         public JobUpdate build() {
             return new JobUpdate(jobId, groups, description, detectorUpdates, modelPlotConfig, analysisLimits, backgroundPersistInterval,
                     renormalizationWindowDays, resultsRetentionDays, modelSnapshotRetentionDays, dailyModelSnapshotRetentionAfterDays,
                     categorizationFilters, perPartitionCategorizationConfig, customSettings, modelSnapshotId, modelSnapshotMinVersion,
-                    jobVersion, clearJobFinishTime, allowLazyOpen);
+                    jobVersion, clearJobFinishTime, allowLazyOpen, blocked);
         }
     }
 }

+ 2 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java

@@ -138,6 +138,7 @@ public final class Messages {
     public static final String JOB_AUDIT_CREATED = "Job created";
     public static final String JOB_AUDIT_UPDATED = "Job updated: {0}";
     public static final String JOB_AUDIT_CLOSING = "Job is closing";
+    public static final String JOB_AUDIT_RESET = "Job has been reset";
     public static final String JOB_AUDIT_FORCE_CLOSING = "Job is closing (forced)";
     public static final String JOB_AUDIT_DATAFEED_CONTINUED_REALTIME = "Datafeed continued in real-time";
     public static final String JOB_AUDIT_DATAFEED_DATA_ANALYSIS_ERROR = "Datafeed is encountering errors submitting data for analysis: {0}";
@@ -263,6 +264,7 @@ public final class Messages {
     public static final String REST_INVALID_FLUSH_PARAMS_MISSING = "Invalid flush parameters: ''{0}'' has not been specified.";
     public static final String REST_INVALID_FLUSH_PARAMS_UNEXPECTED = "Invalid flush parameters: unexpected ''{0}''.";
     public static final String REST_JOB_NOT_CLOSED_REVERT = "Can only revert to a model snapshot when the job is closed.";
+    public static final String REST_JOB_NOT_CLOSED_RESET = "Can only reset a job when it is closed.";
     public static final String REST_NO_SUCH_MODEL_SNAPSHOT = "No model snapshot with id [{0}] exists for job [{1}]";
     public static final String REST_START_AFTER_END = "Invalid time range: end time ''{0}'' is earlier than start time ''{1}''.";
     public static final String REST_NO_SUCH_FORECAST = "No forecast(s) [{0}] exists for job [{1}]";

+ 10 - 0
x-pack/plugin/core/src/main/resources/org/elasticsearch/xpack/core/ml/config_index_mappings.json

@@ -291,6 +291,16 @@
       "background_persist_interval" : {
         "type" : "keyword"
       },
+      "blocked": {
+        "properties": {
+          "reason": {
+            "type": "keyword"
+          },
+          "task_id": {
+            "type": "keyword"
+          }
+        }
+      },
       "chunking_config" : {
         "properties" : {
           "mode" : {

+ 27 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/ResetJobRequestTests.java

@@ -0,0 +1,27 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.action;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+
+public class ResetJobRequestTests extends AbstractWireSerializingTestCase<ResetJobAction.Request> {
+
+    @Override
+    protected ResetJobAction.Request createTestInstance() {
+        ResetJobAction.Request request = new ResetJobAction.Request(randomAlphaOfLength(10));
+        request.setShouldStoreResult(randomBoolean());
+        request.setSkipJobStateValidation(randomBoolean());
+        return request;
+    }
+
+    @Override
+    protected Writeable.Reader<ResetJobAction.Request> instanceReader() {
+        return ResetJobAction.Request::new;
+    }
+}

+ 58 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/BlockedTests.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.core.ml.job.config;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.test.AbstractSerializingTestCase;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+
+public class BlockedTests extends AbstractSerializingTestCase<Blocked> {
+
+    @Override
+    protected Blocked doParseInstance(XContentParser parser) throws IOException {
+        return Blocked.STRICT_PARSER.apply(parser, null);
+    }
+
+    @Override
+    protected Writeable.Reader<Blocked> instanceReader() {
+        return Blocked::new;
+    }
+
+    @Override
+    protected Blocked createTestInstance() {
+        return createRandom();
+    }
+
+    public static Blocked createRandom() {
+        Blocked.Reason reason = randomFrom(Blocked.Reason.values());
+        TaskId taskId = (reason != Blocked.Reason.NONE && randomBoolean()) ?
+            new TaskId(randomAlphaOfLength(10) + ":" + randomNonNegativeLong()) : null;
+        return new Blocked(reason, taskId);
+    }
+
+    public void testReasonFromString() {
+        assertThat(Blocked.Reason.fromString("NonE"), equalTo(Blocked.Reason.NONE));
+        assertThat(Blocked.Reason.fromString("dElETe"), equalTo(Blocked.Reason.DELETE));
+        assertThat(Blocked.Reason.fromString("ReSEt"), equalTo(Blocked.Reason.RESET));
+        assertThat(Blocked.Reason.fromString("reVERt"), equalTo(Blocked.Reason.REVERT));
+    }
+
+    public void testReasonToString() {
+        List<String> asStrings = Arrays.stream(Blocked.Reason.values()).map(Blocked.Reason::toString).collect(Collectors.toList());
+        assertThat(asStrings, contains("none", "delete", "reset", "revert"));
+    }
+}

+ 40 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParseException;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.test.AbstractSerializingTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.ml.MachineLearningField;
@@ -594,6 +595,42 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
         assertNull(Job.extractJobIdFromDocumentId("some_other_type-foo"));
     }
 
+    public void testDeletingAndBlockReasonAreSynced() {
+        {
+            Job job = buildJobBuilder(randomValidJobId())
+                .setDeleting(true)
+                .build();
+            assertThat(job.getBlocked().getReason(), equalTo(Blocked.Reason.DELETE));
+        }
+        {
+            Job job = buildJobBuilder(randomValidJobId())
+                .setBlocked(new Blocked(Blocked.Reason.DELETE, null))
+                .build();
+            assertThat(job.isDeleting(), is(true));
+        }
+    }
+
+    public void testParseJobWithDeletingButWithoutBlockReason() throws IOException {
+        String jobWithDeleting = "{\n" +
+            "    \"job_id\": \"deleting_job\",\n" +
+            "    \"create_time\": 1234567890000,\n" +
+            "    \"analysis_config\": {\n" +
+            "        \"bucket_span\": \"1h\",\n" +
+            "        \"detectors\": [{\"function\": \"count\"}]\n" +
+            "    },\n" +
+            "    \"data_description\": {\n" +
+            "        \"time_field\": \"time\"\n" +
+            "    },\n" +
+            "    \"deleting\": true\n" +
+            "}";
+
+        try (XContentParser parser = JsonXContent.jsonXContent.createParser(
+                NamedXContentRegistry.EMPTY, DeprecationHandler.IGNORE_DEPRECATIONS, jobWithDeleting)) {
+            Job job = doParseInstance(parser);
+            assertThat(job.getBlocked().getReason(), equalTo(Blocked.Reason.DELETE));
+        }
+    }
+
     public static Job.Builder buildJobBuilder(String id, Date date) {
         Job.Builder builder = new Job.Builder(id);
         builder.setCreateTime(date);
@@ -686,6 +723,9 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
         if (randomBoolean()) {
             builder.setAllowLazyOpen(randomBoolean());
         }
+        if (randomBoolean()) {
+            builder.setBlocked(BlockedTests.createRandom());
+        }
         return builder.build();
     }
 }

+ 3 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/JobUpdateTests.java

@@ -130,6 +130,9 @@ public class JobUpdateTests extends AbstractSerializingTestCase<JobUpdate> {
         if (randomBoolean()) {
             update.setAllowLazyOpen(randomBoolean());
         }
+        if (useInternalParser && randomBoolean()) {
+            update.setBlocked(BlockedTests.createRandom());
+        }
 
         return update.build();
     }

+ 2 - 0
x-pack/plugin/ml/qa/ml-with-security/build.gradle

@@ -196,6 +196,8 @@ tasks.named("yamlRestTest").configure {
     'ml/preview_datafeed/Test preview missing datafeed',
     'ml/preview_datafeed/Test preview with datafeed_id and job config',
     'ml/preview_datafeed/Test preview with datafeed id and config',
+    'ml/reset_job/Test reset given job is open',
+    'ml/reset_job/Test reset given unknown job id',
     'ml/revert_model_snapshot/Test revert model with invalid snapshotId',
     'ml/start_data_frame_analytics/Test start given missing source index',
     'ml/start_data_frame_analytics/Test start outlier_detection given source index has no fields',

+ 7 - 3
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlDailyMaintenanceServiceIT.java

@@ -10,10 +10,12 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.xpack.core.ml.action.PutJobAction;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -65,8 +67,10 @@ public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase
         blockingCall(maintenanceService::triggerDeleteJobsInStateDeletingWithoutDeletionTask);
         assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
 
-        this.<Boolean>blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-2", listener));
-        this.<Boolean>blockingCall(listener -> jobConfigProvider.markJobAsDeleting("maintenance-test-3", listener));
+        this.<PutJobAction.Response>blockingCall(listener -> jobConfigProvider.updateJobBlockReason(
+            "maintenance-test-2", new Blocked(Blocked.Reason.DELETE, null), listener));
+        this.<PutJobAction.Response>blockingCall(listener -> jobConfigProvider.updateJobBlockReason(
+            "maintenance-test-3", new Blocked(Blocked.Reason.DELETE, null), listener));
         assertThat(getJobIds(), containsInAnyOrder("maintenance-test-1", "maintenance-test-2", "maintenance-test-3"));
         assertThat(getJob("maintenance-test-1").get(0).isDeleting(), is(false));
         assertThat(getJob("maintenance-test-2").get(0).isDeleting(), is(true));

+ 6 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java

@@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.action.PostDataAction;
 import org.elasticsearch.xpack.core.ml.action.PutCalendarAction;
 import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.PutJobAction;
+import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
 import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
@@ -123,6 +124,11 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
         return client().execute(DeleteJobAction.INSTANCE, request).actionGet();
     }
 
+    protected AcknowledgedResponse resetJob(String jobId) {
+        ResetJobAction.Request request = new ResetJobAction.Request(jobId);
+        return client().execute(ResetJobAction.INSTANCE, request).actionGet();
+    }
+
     protected PutDatafeedAction.Response putDatafeed(DatafeedConfig datafeed) {
         PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeed);
         return client().execute(PutDatafeedAction.INSTANCE, request).actionGet();

+ 1 - 28
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeDataFrameAnalyticsIntegTestCase.java

@@ -8,12 +8,6 @@ package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.action.admin.indices.get.GetIndexAction;
 import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
-import org.elasticsearch.action.search.SearchAction;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.core.Nullable;
@@ -23,15 +17,14 @@ import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
 import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
-import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
+import org.elasticsearch.xpack.core.ml.action.PreviewDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
@@ -62,10 +55,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
-import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.arrayWithSize;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -282,24 +273,6 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
         });
     }
 
-    private static List<String> fetchAllAuditMessages(String dataFrameAnalyticsId) {
-        RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
-        RefreshResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
-        assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201)));
-
-        SearchRequest searchRequest = new SearchRequestBuilder(client(), SearchAction.INSTANCE)
-            .setIndices(NotificationsIndex.NOTIFICATIONS_INDEX)
-            .addSort("timestamp", SortOrder.ASC)
-            .setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId))
-            .setSize(100)
-            .request();
-        SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();
-
-        return Arrays.stream(searchResponse.getHits().getHits())
-            .map(hit -> (String) hit.getSourceAsMap().get("message"))
-            .collect(Collectors.toList());
-    }
-
     protected static Set<String> getTrainingRowsIds(String index) {
         Set<String> trainingRowsIds = new HashSet<>();
         SearchResponse hits = client().prepareSearch(index).setSize(10000).get();

+ 36 - 6
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java

@@ -8,17 +8,19 @@ package org.elasticsearch.xpack.ml.integration;
 
 import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateAction;
 import org.elasticsearch.action.admin.cluster.snapshots.features.ResetFeatureStateRequest;
-import org.elasticsearch.cluster.NamedDiff;
-import org.elasticsearch.xpack.autoscaling.Autoscaling;
-import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata;
-import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
-import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
-import org.elasticsearch.xpack.transform.Transform;
+import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
+import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.NamedDiff;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.Template;
@@ -29,6 +31,7 @@ import org.elasticsearch.common.network.NetworkModule;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.env.Environment;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.ReindexPlugin;
 import org.elasticsearch.ingest.common.IngestCommonPlugin;
 import org.elasticsearch.license.LicenseService;
@@ -43,10 +46,15 @@ import org.elasticsearch.script.ScoreScript;
 import org.elasticsearch.script.ScriptContext;
 import org.elasticsearch.script.ScriptEngine;
 import org.elasticsearch.search.SearchModule;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.SecuritySettingsSourceField;
 import org.elasticsearch.transport.Netty4Plugin;
+import org.elasticsearch.xpack.autoscaling.Autoscaling;
+import org.elasticsearch.xpack.autoscaling.AutoscalingMetadata;
+import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
 import org.elasticsearch.xpack.core.XPackSettings;
+import org.elasticsearch.xpack.core.action.CreateDataStreamAction;
 import org.elasticsearch.xpack.core.action.DeleteDataStreamAction;
 import org.elasticsearch.xpack.core.ilm.DeleteAction;
 import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
@@ -81,6 +89,7 @@ import org.elasticsearch.xpack.ilm.IndexLifecycle;
 import org.elasticsearch.xpack.ml.LocalStateMachineLearning;
 import org.elasticsearch.xpack.ml.autoscaling.MlScalingReason;
 import org.elasticsearch.xpack.ml.inference.ModelAliasMetadata;
+import org.elasticsearch.xpack.transform.Transform;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -96,6 +105,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.test.XContentTestUtils.convertToMap;
 import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
@@ -103,6 +113,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
 import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
 import static org.elasticsearch.xpack.monitoring.MonitoringService.ELASTICSEARCH_COLLECTION_ENABLED;
 import static org.elasticsearch.xpack.security.test.SecurityTestUtils.writeFile;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
 /**
@@ -245,6 +257,24 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
         return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet();
     }
 
+    protected static List<String> fetchAllAuditMessages(String jobId) {
+        RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
+        RefreshResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
+        assertThat(refreshResponse.getStatus().getStatus(), anyOf(equalTo(200), equalTo(201)));
+
+        SearchRequest searchRequest = new SearchRequestBuilder(client(), SearchAction.INSTANCE)
+            .setIndices(NotificationsIndex.NOTIFICATIONS_INDEX)
+            .addSort("timestamp", SortOrder.ASC)
+            .setQuery(QueryBuilders.termQuery("job_id", jobId))
+            .setSize(100)
+            .request();
+        SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();
+
+        return Arrays.stream(searchResponse.getHits().getHits())
+            .map(hit -> (String) hit.getSourceAsMap().get("message"))
+            .collect(Collectors.toList());
+    }
+
     @Override
     protected void ensureClusterStateConsistency() throws IOException {
         if (cluster() != null && cluster().size() > 0) {

+ 84 - 0
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ResetJobIT.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.integration;
+
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
+import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
+import org.elasticsearch.xpack.core.ml.job.config.Detector;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
+import org.elasticsearch.xpack.core.ml.job.results.Bucket;
+import org.junit.After;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+public class ResetJobIT extends MlNativeAutodetectIntegTestCase {
+
+    @After
+    public void tearDownData() {
+        cleanUp();
+    }
+
+    public void testReset() throws IOException {
+        TimeValue bucketSpan = TimeValue.timeValueMinutes(30);
+        long startTime = 1514764800000L;
+        final int bucketCount = 100;
+        Job.Builder job = createJob("test-reset", bucketSpan);
+
+        openJob(job.getId());
+        postData(job.getId(), generateData(startTime, bucketSpan, bucketCount + 1, bucketIndex -> randomIntBetween(100, 200))
+            .stream().collect(Collectors.joining()));
+        closeJob(job.getId());
+
+        List<Bucket> buckets = getBuckets(job.getId());
+        assertThat(buckets.isEmpty(), is(false));
+
+        DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts();
+        assertThat(dataCounts.getProcessedRecordCount(), greaterThan(0L));
+
+        resetJob(job.getId());
+
+        buckets = getBuckets(job.getId());
+        assertThat(buckets.isEmpty(), is(true));
+
+        dataCounts = getJobStats(job.getId()).get(0).getDataCounts();
+        assertThat(dataCounts.getProcessedRecordCount(), equalTo(0L));
+
+        Job jobAfterReset = getJob(job.getId()).get(0);
+        assertThat(jobAfterReset.getBlocked(), equalTo(Blocked.none()));
+        assertThat(jobAfterReset.getModelSnapshotId(), is(nullValue()));
+        assertThat(jobAfterReset.getFinishedTime(), is(nullValue()));
+
+        List<String> auditMessages = fetchAllAuditMessages(job.getId());
+        assertThat(auditMessages.isEmpty(), is(false));
+        assertThat(auditMessages.get(auditMessages.size() - 1), equalTo("Job has been reset"));
+    }
+
+    private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
+        Detector.Builder detector = new Detector.Builder("count", null);
+        AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
+        analysisConfig.setBucketSpan(bucketSpan);
+        Job.Builder job = new Job.Builder(jobId);
+        job.setAnalysisConfig(analysisConfig);
+        DataDescription.Builder dataDescription = new DataDescription.Builder();
+        job.setDataDescription(dataDescription);
+        putJob(job);
+
+        return job;
+    }
+}

+ 1 - 1
x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/ScheduledEventsIT.java

@@ -384,7 +384,7 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
         assertEquals(0, buckets.get(2).getScheduledEvents().size());
     }
 
-        private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
+    private Job.Builder createJob(String jobId, TimeValue bucketSpan) {
         Detector.Builder detector = new Detector.Builder("count", null);
         AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(detector.build()));
         analysisConfig.setBucketSpan(bucketSpan);

+ 16 - 10
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/JobConfigProviderIT.java

@@ -21,8 +21,10 @@ import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
 import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
+import org.elasticsearch.xpack.core.ml.action.PutJobAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -415,8 +417,9 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
         putJob(createJob("foo-deleting", null));
         putJob(createJob("bar", null));
 
-        Boolean marked = blockingCall(actionListener -> jobConfigProvider.markJobAsDeleting("foo-deleting", actionListener));
-        assertTrue(marked);
+        PutJobAction.Response marked = blockingCall(actionListener -> jobConfigProvider.updateJobBlockReason(
+            "foo-deleting", new Blocked(Blocked.Reason.DELETE, null), actionListener));
+        assertThat(marked.getResponse().getBlocked().getReason(), equalTo(Blocked.Reason.DELETE));
 
         client().admin().indices().prepareRefresh(MlConfigIndex.indexName()).get();
 
@@ -559,27 +562,30 @@ public class JobConfigProviderIT extends MlSingleNodeTestCase {
         assertEquals(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, exceptionHolder.get().getMessage());
     }
 
-    public void testMarkAsDeleting() throws Exception {
-        AtomicReference<Boolean> responseHolder = new AtomicReference<>();
+    public void testUpdateJobBlockReason() throws Exception {
+        AtomicReference<PutJobAction.Response> responseHolder = new AtomicReference<>();
         AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
 
-        blockingCall(listener -> jobConfigProvider.markJobAsDeleting("missing-job", listener), responseHolder, exceptionHolder);
+        blockingCall(listener -> jobConfigProvider.updateJobBlockReason(
+            "missing-job", new Blocked(Blocked.Reason.RESET, null), listener), responseHolder, exceptionHolder);
         assertNull(responseHolder.get());
         assertEquals(ResourceNotFoundException.class, exceptionHolder.get().getClass());
 
-        String jobId = "mark-as-deleting-job";
+        String jobId = "update-job-blocked-reset";
         putJob(createJob(jobId, Collections.emptyList()));
         client().admin().indices().prepareRefresh(MlConfigIndex.indexName()).get();
 
         exceptionHolder.set(null);
-        blockingCall(listener -> jobConfigProvider.markJobAsDeleting(jobId, listener), responseHolder, exceptionHolder);
+        blockingCall(listener -> jobConfigProvider.updateJobBlockReason(
+            jobId, new Blocked(Blocked.Reason.RESET, null), listener), responseHolder, exceptionHolder);
         assertNull(exceptionHolder.get());
-        assertTrue(responseHolder.get());
+        assertThat(responseHolder.get().getResponse().getBlocked().getReason(), equalTo(Blocked.Reason.RESET));
 
         // repeat the update for good measure
-        blockingCall(listener -> jobConfigProvider.markJobAsDeleting(jobId, listener), responseHolder, exceptionHolder);
-        assertTrue(responseHolder.get());
+        blockingCall(listener -> jobConfigProvider.updateJobBlockReason(
+            jobId, new Blocked(Blocked.Reason.RESET, null), listener), responseHolder, exceptionHolder);
         assertNull(exceptionHolder.get());
+        assertThat(responseHolder.get().getResponse().getBlocked().getReason(), equalTo(Blocked.Reason.RESET));
     }
 
     private static Job.Builder createJob(String jobId, List<String> groups) {

+ 1 - 1
x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

@@ -523,7 +523,7 @@ public class MlDistributedFailureIT extends BaseMlIntegTestCase {
 
         assertBusy(() -> {
             DataCounts dataCounts = getJobStats(jobId).getDataCounts();
-            assertThat(dataCounts.getProcessedRecordCount(), greaterThanOrEqualTo(numDocs));
+            assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs));
             assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
         });
 

+ 5 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

@@ -141,6 +141,7 @@ import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
 import org.elasticsearch.xpack.core.ml.action.PutJobAction;
 import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAction;
 import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction;
+import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.action.SetResetModeAction;
 import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
@@ -228,6 +229,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutFilterAction;
 import org.elasticsearch.xpack.ml.action.TransportPutJobAction;
 import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAction;
 import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction;
+import org.elasticsearch.xpack.ml.action.TransportResetJobAction;
 import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction;
 import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction;
 import org.elasticsearch.xpack.ml.action.TransportSetUpgradeModeAction;
@@ -368,6 +370,7 @@ import org.elasticsearch.xpack.ml.rest.job.RestOpenJobAction;
 import org.elasticsearch.xpack.ml.rest.job.RestPostDataAction;
 import org.elasticsearch.xpack.ml.rest.job.RestPostJobUpdateAction;
 import org.elasticsearch.xpack.ml.rest.job.RestPutJobAction;
+import org.elasticsearch.xpack.ml.rest.job.RestResetJobAction;
 import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestDeleteModelSnapshotAction;
 import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestGetModelSnapshotsAction;
 import org.elasticsearch.xpack.ml.rest.modelsnapshots.RestRevertModelSnapshotAction;
@@ -932,6 +935,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
             new RestPostDataAction(),
             new RestCloseJobAction(),
             new RestFlushJobAction(),
+            new RestResetJobAction(),
             new RestValidateDetectorAction(),
             new RestValidateJobConfigAction(),
             new RestEstimateModelMemoryAction(),
@@ -1018,6 +1022,7 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin,
                 new ActionHandler<>(CloseJobAction.INSTANCE, TransportCloseJobAction.class),
                 new ActionHandler<>(FinalizeJobExecutionAction.INSTANCE, TransportFinalizeJobExecutionAction.class),
                 new ActionHandler<>(FlushJobAction.INSTANCE, TransportFlushJobAction.class),
+                new ActionHandler<>(ResetJobAction.INSTANCE, TransportResetJobAction.class),
                 new ActionHandler<>(ValidateDetectorAction.INSTANCE, TransportValidateDetectorAction.class),
                 new ActionHandler<>(ValidateJobConfigAction.INSTANCE, TransportValidateJobConfigAction.class),
                 new ActionHandler<>(EstimateModelMemoryAction.INSTANCE, TransportEstimateModelMemoryAction.class),

+ 46 - 7
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java

@@ -11,6 +11,8 @@ import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
+import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
@@ -21,10 +23,10 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.core.CheckedConsumer;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.tasks.Task;
@@ -34,6 +36,10 @@ import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
 import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
+import org.elasticsearch.xpack.core.ml.action.PutJobAction;
+import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@@ -144,7 +150,7 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
                 }
         );
 
-        ActionListener<Boolean> markAsDeletingListener = ActionListener.wrap(
+        ActionListener<PutJobAction.Response> markAsDeletingListener = ActionListener.wrap(
                 response -> {
                     if (request.isForce()) {
                         forceDeleteJob(parentTaskClient, request, finalListener);
@@ -157,7 +163,7 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
         ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
             response -> {
                 auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
-                markJobAsDeletingIfNotUsed(request.getJobId(), markAsDeletingListener);
+                markJobAsDeletingIfNotUsed(request.getJobId(), taskId, markAsDeletingListener);
             },
             e -> {
                 if (request.isForce()
@@ -229,7 +235,7 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
 
         // Step 1. Delete the physical storage
         new JobDataDeleter(parentTaskClient, jobId).deleteJobDocuments(
-            jobId, jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure);
+            jobConfigProvider, indexNameExpressionResolver, clusterService.state(), removeFromCalendarsHandler, listener::onFailure);
     }
 
     private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request,
@@ -304,7 +310,7 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
         }
     }
 
-    private void markJobAsDeletingIfNotUsed(String jobId, ActionListener<Boolean> listener) {
+    private void markJobAsDeletingIfNotUsed(String jobId, TaskId taskId, ActionListener<PutJobAction.Response> listener) {
 
         datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
                 datafeedIds -> {
@@ -313,9 +319,42 @@ public class TransportDeleteJobAction extends AcknowledgedTransportMasterNodeAct
                                 + datafeedIds.iterator().next() + "] refers to it"));
                         return;
                     }
-                    jobConfigProvider.markJobAsDeleting(jobId, listener);
+                    cancelResetTaskIfExists(jobId, ActionListener.wrap(
+                        response -> jobConfigProvider.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.DELETE, taskId), listener),
+                        listener::onFailure
+                    ));
                 },
                 listener::onFailure
         ));
     }
+
+    private void cancelResetTaskIfExists(String jobId, ActionListener<Boolean> listener) {
+        ActionListener<Job.Builder> jobListener = ActionListener.wrap(
+            jobBuilder -> {
+                Job job = jobBuilder.build();
+                if (job.getBlocked().getReason() == Blocked.Reason.RESET) {
+                    logger.info("[{}] Cancelling reset task [{}] because delete was requested", jobId, job.getBlocked().getTaskId());
+                    CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
+                    cancelTasksRequest.setReason("deleting job");
+                    cancelTasksRequest.setActions(ResetJobAction.NAME);
+                    cancelTasksRequest.setTaskId(job.getBlocked().getTaskId());
+                    executeAsyncWithOrigin(client, ML_ORIGIN, CancelTasksAction.INSTANCE, cancelTasksRequest, ActionListener.wrap(
+                        cancelTasksResponse -> listener.onResponse(true),
+                        e -> {
+                            if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
+                                listener.onResponse(true);
+                            } else {
+                                listener.onFailure(e);
+                            }
+                        }
+                    ));
+                } else {
+                    listener.onResponse(false);
+                }
+            },
+            listener::onFailure
+        );
+
+        jobConfigProvider.getJob(jobId, jobListener);
+    }
 }

+ 212 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetJobAction.java

@@ -0,0 +1,212 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.action;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
+import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.ParentTaskAssigningClient;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.core.CheckedConsumer;
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.tasks.TaskResult;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ml.MlMetadata;
+import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+import org.elasticsearch.xpack.core.ml.job.config.JobState;
+import org.elasticsearch.xpack.core.ml.job.messages.Messages;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
+import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
+import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
+import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
+
+import java.util.Objects;
+
+import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+
+public class TransportResetJobAction extends AcknowledgedTransportMasterNodeAction<ResetJobAction.Request> {
+
+    private static final Logger logger = LogManager.getLogger(TransportResetJobAction.class);
+
+    private final Client client;
+    private final JobConfigProvider jobConfigProvider;
+    private final JobResultsProvider jobResultsProvider;
+    private final AnomalyDetectionAuditor auditor;
+
+    @Inject
+    public TransportResetJobAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
+                                   ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client,
+                                   JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider,
+                                   AnomalyDetectionAuditor auditor) {
+        super(ResetJobAction.NAME, transportService, clusterService, threadPool, actionFilters, ResetJobAction.Request::new,
+            indexNameExpressionResolver, ThreadPool.Names.SAME);
+        this.client = Objects.requireNonNull(client);
+        this.jobConfigProvider = Objects.requireNonNull(jobConfigProvider);
+        this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
+        this.auditor = Objects.requireNonNull(auditor);
+    }
+
+    @Override
+    protected void masterOperation(Task task, ResetJobAction.Request request, ClusterState state,
+                                   ActionListener<AcknowledgedResponse> listener) throws Exception {
+        if (MlMetadata.getMlMetadata(state).isUpgradeMode()) {
+            listener.onFailure(ExceptionsHelper.conflictStatusException("cannot reset job while indices are being upgraded"));
+            return;
+        }
+
+        final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
+
+        ActionListener<Job.Builder> jobListener = ActionListener.wrap(
+            jobBuilder -> {
+                Job job = jobBuilder.build();
+                PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
+                JobState jobState = MlTasks.getJobState(job.getId(), tasks);
+                if (request.isSkipJobStateValidation() == false && jobState != JobState.CLOSED) {
+                    listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_RESET)));
+                    return;
+                }
+                if (job.getBlocked().getReason() != Blocked.Reason.NONE && job.getBlocked().getReason() != Blocked.Reason.RESET) {
+                    listener.onFailure(ExceptionsHelper.conflictStatusException(
+                        "cannot reset job while it is blocked with [" + job.getBlocked().getReason() + "]"));
+                    return;
+                }
+
+                if (job.getBlocked().getReason() == Blocked.Reason.RESET) {
+                    waitExistingResetTaskToComplete(job.getBlocked().getTaskId(), request, listener);
+                } else {
+                    ParentTaskAssigningClient taskClient = new ParentTaskAssigningClient(client, taskId);
+                    jobConfigProvider.updateJobBlockReason(job.getId(), new Blocked(Blocked.Reason.RESET, taskId), ActionListener.wrap(
+                        r -> resetJob(taskClient, (CancellableTask) task, request, listener),
+                        listener::onFailure
+                    ));
+                }
+            },
+            listener::onFailure
+        );
+
+        jobConfigProvider.getJob(request.getJobId(), jobListener);
+    }
+
+    private void waitExistingResetTaskToComplete(TaskId existingTaskId, ResetJobAction.Request request,
+                                                 ActionListener<AcknowledgedResponse> listener) {
+        logger.debug(() -> new ParameterizedMessage(
+            "[{}] Waiting on existing reset task: {}", request.getJobId(), existingTaskId));
+        GetTaskRequest getTaskRequest = new GetTaskRequest();
+        getTaskRequest.setTaskId(existingTaskId);
+        getTaskRequest.setWaitForCompletion(true);
+        getTaskRequest.setTimeout(request.timeout());
+        executeAsyncWithOrigin(client, ML_ORIGIN, GetTaskAction.INSTANCE, getTaskRequest, ActionListener.wrap(
+            getTaskResponse -> {
+                TaskResult taskResult = getTaskResponse.getTask();
+                if (taskResult.isCompleted()) {
+                    listener.onResponse(AcknowledgedResponse.of(true));
+                } else {
+                    BytesReference taskError = taskResult.getError();
+                    if (taskError != null) {
+                        listener.onFailure(ExceptionsHelper.serverError("reset failed to complete; error [{}]",
+                            taskError.utf8ToString()));
+                    } else {
+                        listener.onFailure(ExceptionsHelper.serverError("reset failed to complete"));
+                    }
+                }
+            },
+            listener::onFailure
+        ));
+    }
+
+    private void resetJob(ParentTaskAssigningClient taskClient, CancellableTask task, ResetJobAction.Request request,
+                          ActionListener<AcknowledgedResponse> listener) {
+        String jobId = request.getJobId();
+
+        // Now that we have updated the job's block reason, we should check again
+        // if the job has been opened.
+        PersistentTasksCustomMetadata tasks = clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
+        JobState jobState = MlTasks.getJobState(jobId, tasks);
+        if (request.isSkipJobStateValidation() == false && jobState != JobState.CLOSED) {
+            jobConfigProvider.updateJobBlockReason(jobId, null, ActionListener.wrap(
+                clearResetResponse -> listener.onFailure(ExceptionsHelper.conflictStatusException(
+                    Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_RESET))),
+                e -> listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_RESET)))
+            ));
+            return;
+        }
+
+        logger.info("[{}] Resetting job", jobId);
+
+        ActionListener<Boolean> resultsIndexCreatedListener = ActionListener.wrap(
+            resultsIndexCreatedResponse -> {
+                if (task.isCancelled()) {
+                    listener.onResponse(AcknowledgedResponse.of(false));
+                    return;
+                }
+                finishSuccessfulReset(jobId, listener);
+            },
+            listener::onFailure
+        );
+
+        CheckedConsumer<Boolean, Exception> jobDocsDeletionListener = response -> {
+            if (task.isCancelled()) {
+                listener.onResponse(AcknowledgedResponse.of(false));
+                return;
+            }
+            jobConfigProvider.getJob(jobId, ActionListener.wrap(
+                jobBuilder -> {
+                    if (task.isCancelled()) {
+                        listener.onResponse(AcknowledgedResponse.of(false));
+                        return;
+                    }
+                    jobResultsProvider.createJobResultIndex(
+                        jobBuilder.build(), clusterService.state(), resultsIndexCreatedListener);
+                },
+                listener::onFailure
+            ));
+        };
+
+        JobDataDeleter jobDataDeleter = new JobDataDeleter(taskClient, jobId);
+        jobDataDeleter.deleteJobDocuments(jobConfigProvider, indexNameExpressionResolver,
+            clusterService.state(), jobDocsDeletionListener, listener::onFailure);
+    }
+
+    private void finishSuccessfulReset(String jobId, ActionListener<AcknowledgedResponse> listener) {
+        jobConfigProvider.updateJobAfterReset(jobId, ActionListener.wrap(
+            blockReasonUpdatedResponse -> {
+                logger.info("[{}] Reset has successfully completed", jobId);
+                auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_RESET));
+                listener.onResponse(AcknowledgedResponse.of(true));
+            },
+            listener::onFailure
+        ));
+    }
+
+    @Override
+    protected ClusterBlockException checkBlock(ResetJobAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+}

+ 98 - 25
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportRevertModelSnapshotAction.java

@@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
+import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.client.Client;
@@ -22,6 +24,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.MlConfigIndex;
@@ -29,6 +32,8 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.annotations.Annotation;
 import org.elasticsearch.xpack.core.ml.annotations.AnnotationIndex;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
 import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@@ -45,6 +50,9 @@ import java.util.Date;
 import java.util.Set;
 import java.util.function.Consumer;
 
+import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
+import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
+
 public class TransportRevertModelSnapshotAction extends TransportMasterNodeAction<RevertModelSnapshotAction.Request,
         RevertModelSnapshotAction.Response> {
 
@@ -75,6 +83,7 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
     protected void masterOperation(Task task, RevertModelSnapshotAction.Request request, ClusterState state,
                                    ActionListener<RevertModelSnapshotAction.Response> listener) {
         final String jobId = request.getJobId();
+        final TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
 
         if (migrationEligibilityCheck.jobIsEligibleForMigration(jobId, state)) {
             listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("revert model snapshot", jobId));
@@ -87,32 +96,44 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
         // 5. Revert the state
         ActionListener<Boolean> annotationsIndexUpdateListener = ActionListener.wrap(
             r -> {
-                PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
-                JobState jobState = MlTasks.getJobState(jobId, tasks);
-
-                if (request.isForce() == false && jobState.equals(JobState.CLOSED) == false) {
-                    listener.onFailure(ExceptionsHelper.conflictStatusException(Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)));
-                    return;
-                }
-
-                if (MlTasks.getSnapshotUpgraderTask(jobId, request.getSnapshotId(), tasks) != null) {
-                    listener.onFailure(ExceptionsHelper.conflictStatusException(
-                        "Cannot revert job [{}] to snapshot [{}] as it is being upgraded",
-                        jobId,
-                        request.getSnapshotId()
-                    ));
-                    return;
-                }
+                ActionListener<Job> jobListener = ActionListener.wrap(
+                    job -> {
+                        PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
+                        JobState jobState = MlTasks.getJobState(job.getId(), tasks);
+                        if (request.isForce() == false && jobState.equals(JobState.CLOSED) == false) {
+                            listener.onFailure(ExceptionsHelper.conflictStatusException(
+                                Messages.getMessage(Messages.REST_JOB_NOT_CLOSED_REVERT)));
+                            return;
+                        }
+                        if (MlTasks.getSnapshotUpgraderTask(jobId, request.getSnapshotId(), tasks) != null) {
+                            listener.onFailure(ExceptionsHelper.conflictStatusException(
+                                "Cannot revert job [{}] to snapshot [{}] as it is being upgraded",
+                                jobId,
+                                request.getSnapshotId()
+                            ));
+                            return;
+                        }
+                        isBlocked(job, ActionListener.wrap(
+                            isBlocked -> {
+                                if (isBlocked) {
+                                    listener.onFailure(ExceptionsHelper.conflictStatusException(
+                                        "cannot revert job [{}] to snapshot [{}] while it is blocked with [{}]",
+                                        jobId, request.getSnapshotId(), job.getBlocked().getReason())
+                                    );
+                                } else {
+                                    jobManager.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.REVERT, taskId), ActionListener.wrap(
+                                        aBoolean -> revertSnapshot(jobId, request, listener),
+                                        listener::onFailure
+                                    ));
+                                }
+                            },
+                            listener::onFailure
+                        ));
+                    },
+                    listener::onFailure
+                );
 
-                getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
-                    ActionListener<RevertModelSnapshotAction.Response> wrappedListener = listener;
-                    if (request.getDeleteInterveningResults()) {
-                        wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, jobId);
-                        wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, jobId);
-                        wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, jobId);
-                    }
-                    jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
-                }, listener::onFailure);
+                jobManager.getJob(jobId, jobListener);
             },
             listener::onFailure
         );
@@ -142,6 +163,58 @@ public class TransportRevertModelSnapshotAction extends TransportMasterNodeActio
             createStateIndexListener);
     }
 
+    private void isBlocked(Job job, ActionListener<Boolean> listener) {
+        if (job.getBlocked().getReason() == Blocked.Reason.NONE) {
+            listener.onResponse(false);
+            return;
+        }
+        if (job.getBlocked().getReason() == Blocked.Reason.REVERT) {
+            // If another revert is called but there is a revert task running
+            // we do not allow it to run. However, if the job got stuck with
+            // a block on revert, it means the node that was running the previous
+            // revert failed. So, we allow a revert to run if the task has completed
+            // in order to complete and eventually unblock the job.
+            GetTaskRequest getTaskRequest = new GetTaskRequest();
+            getTaskRequest.setTaskId(job.getBlocked().getTaskId());
+            executeAsyncWithOrigin(client, ML_ORIGIN, GetTaskAction.INSTANCE, getTaskRequest, ActionListener.wrap(
+                r -> listener.onResponse(r.getTask().isCompleted() == false),
+                e -> {
+                    if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
+                        listener.onResponse(false);
+                    } else {
+                        listener.onFailure(e);
+                    }
+                }
+            ));
+        } else {
+            listener.onResponse(true);
+        }
+    }
+
+    private void revertSnapshot(String jobId, RevertModelSnapshotAction.Request request,
+                                ActionListener<RevertModelSnapshotAction.Response> listener) {
+        ActionListener<RevertModelSnapshotAction.Response> finalListener = ActionListener.wrap(
+            r -> jobManager.updateJobBlockReason(jobId, Blocked.none(), ActionListener.wrap(
+                    aBoolean -> listener.onResponse(r),
+                    listener::onFailure
+                ))
+            , e -> jobManager.updateJobBlockReason(jobId, Blocked.none(), ActionListener.wrap(
+                aBoolean -> listener.onFailure(e),
+                listener::onFailure
+            ))
+        );
+
+        getModelSnapshot(request, jobResultsProvider, modelSnapshot -> {
+            ActionListener<RevertModelSnapshotAction.Response> wrappedListener = finalListener;
+            if (request.getDeleteInterveningResults()) {
+                wrappedListener = wrapDeleteOldAnnotationsListener(wrappedListener, modelSnapshot, jobId);
+                wrappedListener = wrapDeleteOldDataListener(wrappedListener, modelSnapshot, jobId);
+                wrappedListener = wrapRevertDataCountsListener(wrappedListener, modelSnapshot, jobId);
+            }
+            jobManager.revertSnapshot(request, wrappedListener, modelSnapshot);
+        }, listener::onFailure);
+    }
+
     private void getModelSnapshot(RevertModelSnapshotAction.Request request, JobResultsProvider provider, Consumer<ModelSnapshot> handler,
                                   Consumer<Exception> errorHandler) {
         logger.info("Reverting to snapshot '" + request.getSnapshotId() + "'");

+ 6 - 1
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

@@ -18,7 +18,6 @@ import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.service.ClusterService;
-import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.logging.DeprecationCategory;
 import org.elasticsearch.common.logging.DeprecationLogger;
@@ -28,6 +27,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.index.analysis.AnalysisRegistry;
 import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
@@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
 import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
@@ -661,4 +662,8 @@ public class JobManager {
             actionListener::onFailure
         ));
     }
+
+    public void updateJobBlockReason(String jobId, Blocked blocked, ActionListener<PutJobAction.Response> listener) {
+        jobConfigProvider.updateJobBlockReason(jobId, blocked, listener);
+    }
 }

+ 43 - 51
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

@@ -9,7 +9,6 @@ package org.elasticsearch.xpack.ml.job.persistence;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.search.join.ScoreMode;
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
@@ -27,10 +26,7 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.update.UpdateAction;
-import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.core.Nullable;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.regex.Regex;
@@ -42,8 +38,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.engine.DocumentMissingException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.query.BoolQueryBuilder;
 import org.elasticsearch.index.query.ExistsQueryBuilder;
@@ -60,12 +56,16 @@ import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
 import org.elasticsearch.xpack.core.ml.MlConfigIndex;
 import org.elasticsearch.xpack.core.ml.MlTasks;
+import org.elasticsearch.xpack.core.ml.action.PutJobAction;
+import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
+import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 import org.elasticsearch.xpack.core.ml.utils.MlStrings;
 import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
@@ -273,17 +273,16 @@ public class JobConfigProvider {
      * @param update The job update
      * @param maxModelMemoryLimit The maximum model memory allowed
      * @param validator The job update validator
-     * @param updatedJobListener Updated job listener
+     * @param listener Updated job listener
      */
     public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
-                                        UpdateValidator validator, ActionListener<Job> updatedJobListener) {
+                                        UpdateValidator validator, ActionListener<Job> listener) {
         GetRequest getRequest = new GetRequest(MlConfigIndex.indexName(), Job.documentId(jobId));
 
-        executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, new ActionListener.Delegating<>(updatedJobListener) {
-            @Override
-            public void onResponse(GetResponse getResponse) {
+        executeAsyncWithOrigin(client, ML_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(
+            getResponse -> {
                 if (getResponse.isExists() == false) {
-                    delegate.onFailure(ExceptionsHelper.missingJobException(jobId));
+                    listener.onFailure(ExceptionsHelper.missingJobException(jobId));
                     return;
                 }
 
@@ -294,27 +293,34 @@ public class JobConfigProvider {
                 try {
                     originalJob = parseJobLenientlyFromSource(source).build();
                 } catch (Exception e) {
-                    delegate.onFailure(new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e));
+                    listener.onFailure(new ElasticsearchParseException("Failed to parse job configuration [" + jobId + "]", e));
                     return;
                 }
 
                 validator.validate(originalJob, update, ActionListener.wrap(
-                        validated  -> {
-                            Job updatedJob;
-                            try {
-                                // Applying the update may result in a validation error
-                                updatedJob = update.mergeWithJob(originalJob, maxModelMemoryLimit);
-                            } catch (Exception e) {
-                                delegate.onFailure(e);
-                                return;
-                            }
+                    validated  -> {
+                        Job updatedJob;
+                        try {
+                            // Applying the update may result in a validation error
+                            updatedJob = update.mergeWithJob(originalJob, maxModelMemoryLimit);
+                        } catch (Exception e) {
+                            listener.onFailure(e);
+                            return;
+                        }
 
-                            indexUpdatedJob(updatedJob, seqNo, primaryTerm, delegate);
-                        },
-                        delegate::onFailure
+                        indexUpdatedJob(updatedJob, seqNo, primaryTerm, listener);
+                    },
+                    listener::onFailure
                 ));
+            },
+            e -> {
+                if (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) {
+                    listener.onFailure(ExceptionsHelper.missingJobException(jobId));
+                } else {
+                    listener.onFailure(e);
+                }
             }
-        });
+        ));
     }
 
     private void indexUpdatedJob(Job updatedJob, long seqNo, long primaryTerm,
@@ -425,32 +431,18 @@ public class JobConfigProvider {
                 , client::search);
     }
 
-    /**
-     * Sets the job's {@code deleting} field to true
-     * @param jobId     The job to mark as deleting
-     * @param listener  Responds with true if successful else an error
-     */
-    public void markJobAsDeleting(String jobId, ActionListener<Boolean> listener) {
-        UpdateRequest updateRequest = new UpdateRequest(MlConfigIndex.indexName(), Job.documentId(jobId));
-        updateRequest.retryOnConflict(3);
-        updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        updateRequest.doc(Collections.singletonMap(Job.DELETING.getPreferredName(), Boolean.TRUE));
-
-        executeAsyncWithOrigin(client, ML_ORIGIN, UpdateAction.INSTANCE, updateRequest, ActionListener.wrap(
-               response -> {
-                   assert (response.getResult() == DocWriteResponse.Result.UPDATED) ||
-                           (response.getResult() == DocWriteResponse.Result.NOOP);
-                   listener.onResponse(Boolean.TRUE);
-               },
-               e -> {
-                   ElasticsearchException[] causes = ElasticsearchException.guessRootCauses(e);
-                   if (causes[0] instanceof DocumentMissingException) {
-                       listener.onFailure(ExceptionsHelper.missingJobException(jobId));
-                   } else {
-                       listener.onFailure(e);
-                   }
-               }
-        ));
+    public void updateJobBlockReason(String jobId, Blocked blocked, ActionListener<PutJobAction.Response> listener) {
+        JobUpdate jobUpdate = new JobUpdate.Builder(jobId).setBlocked(blocked).build();
+        executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, UpdateJobAction.Request.internal(jobId, jobUpdate), listener);
+    }
+
+    public void updateJobAfterReset(String jobId, ActionListener<PutJobAction.Response> listener) {
+        JobUpdate jobUpdate = new JobUpdate.Builder(jobId)
+            .setModelSnapshotId(ModelSnapshot.EMPTY_SNAPSHOT_ID)
+            .setBlocked(Blocked.none())
+            .setClearFinishTime(true)
+            .build();
+        executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, UpdateJobAction.Request.internal(jobId, jobUpdate), listener);
     }
 
     /**

+ 3 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobDataDeleter.java

@@ -244,9 +244,9 @@ public class JobDataDeleter {
     /**
      * Deletes all documents associated with a job except user annotations and notifications
      */
-    public void deleteJobDocuments(String jobId, JobConfigProvider jobConfigProvider,
-                                   IndexNameExpressionResolver indexNameExpressionResolver, ClusterState clusterState,
-                                   CheckedConsumer<Boolean, Exception> finishedHandler, Consumer<Exception> failureHandler) {
+    public void deleteJobDocuments(JobConfigProvider jobConfigProvider, IndexNameExpressionResolver indexNameExpressionResolver,
+                                   ClusterState clusterState, CheckedConsumer<Boolean, Exception> finishedHandler,
+                                   Consumer<Exception> failureHandler) {
 
         AtomicReference<String[]> indexNames = new AtomicReference<>();
 

+ 33 - 13
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

@@ -35,7 +35,9 @@ import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.FinalizeJobExecutionAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsAction;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
+import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
 import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
 import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
@@ -165,8 +167,9 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
         if (job == null) {
             throw ExceptionsHelper.missingJobException(jobId);
         }
-        if (job.isDeleting()) {
-            throw ExceptionsHelper.conflictStatusException("Cannot open job [{}] because it is being deleted", jobId);
+        if (job.getBlocked().getReason() != Blocked.Reason.NONE) {
+            throw ExceptionsHelper.conflictStatusException("Cannot open job [{}] because it is executing [{}]", jobId,
+                job.getBlocked().getReason());
         }
         if (job.getJobVersion() == null) {
             throw ExceptionsHelper.badRequestException(
@@ -239,8 +242,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
 
         ActionListener<Boolean> hasRunningDatafeedTaskListener = ActionListener.wrap(
             hasRunningDatafeed -> {
-                if (hasRunningDatafeed && clusterState.nodes().getMasterNode().getVersion().onOrAfter(
-                    MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
+                if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
 
                     // This job has a running datafeed attached to it.
                     // In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
@@ -255,6 +257,10 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
         hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
     }
 
+    private boolean isMasterNodeVersionOnOrAfter(Version version) {
+        return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version);
+    }
+
     private void hasRunningDatafeedTask(String jobId, ActionListener<Boolean> listener) {
         ActionListener<Set<String>> datafeedListener = ActionListener.wrap(
             datafeeds -> {
@@ -275,9 +281,7 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
         datafeedConfigProvider.findDatafeedsForJobIds(Collections.singleton(jobId), datafeedListener);
     }
 
-    private void revertToCurrentSnapshot(String jobId, ActionListener<RevertModelSnapshotAction.Response> listener) {
-        logger.info("[{}] job has running datafeed task; reverting to current snapshot", jobId);
-
+    private void revertToCurrentSnapshot(String jobId, ActionListener<Boolean> listener) {
         ActionListener<GetJobsAction.Response> jobListener = ActionListener.wrap(
             jobResponse -> {
                 List<Job> jobPage = jobResponse.getResponse().results();
@@ -285,12 +289,28 @@ public class OpenJobPersistentTasksExecutor extends AbstractJobPersistentTasksEx
                 assert jobPage.size() == 1;
 
                 String jobSnapshotId = jobPage.get(0).getModelSnapshotId();
-                RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId,
-                    jobSnapshotId == null ? ModelSnapshot.EMPTY_SNAPSHOT_ID : jobSnapshotId);
-                request.setForce(true);
-                request.setDeleteInterveningResults(true);
-                request.masterNodeTimeout(PERSISTENT_TASK_MASTER_NODE_TIMEOUT);
-                executeAsyncWithOrigin(client, ML_ORIGIN, RevertModelSnapshotAction.INSTANCE, request, listener);
+                if (jobSnapshotId == null && isMasterNodeVersionOnOrAfter(ResetJobAction.VERSION_INTRODUCED)) {
+                    logger.info("[{}] job has running datafeed task; resetting as no snapshot exists", jobId);
+                    ResetJobAction.Request request = new ResetJobAction.Request(jobId);
+                    request.setSkipJobStateValidation(true);
+                    request.masterNodeTimeout(PERSISTENT_TASK_MASTER_NODE_TIMEOUT);
+                    request.timeout(PERSISTENT_TASK_MASTER_NODE_TIMEOUT);
+                    executeAsyncWithOrigin(client, ML_ORIGIN, ResetJobAction.INSTANCE, request, ActionListener.wrap(
+                        response -> listener.onResponse(true),
+                        listener::onFailure
+                    ));
+                } else {
+                    logger.info("[{}] job has running datafeed task; reverting to current snapshot", jobId);
+                    RevertModelSnapshotAction.Request request = new RevertModelSnapshotAction.Request(jobId,
+                        jobSnapshotId == null ? ModelSnapshot.EMPTY_SNAPSHOT_ID : jobSnapshotId);
+                    request.setForce(true);
+                    request.setDeleteInterveningResults(true);
+                    request.masterNodeTimeout(PERSISTENT_TASK_MASTER_NODE_TIMEOUT);
+                    executeAsyncWithOrigin(client, ML_ORIGIN, RevertModelSnapshotAction.INSTANCE, request, ActionListener.wrap(
+                        response -> listener.onResponse(true),
+                        listener::onFailure
+                    ));
+                }
             },
             error -> listener.onFailure(ExceptionsHelper.serverError("[{}] error getting job", error, jobId))
         );

+ 76 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestResetJobAction.java

@@ -0,0 +1,76 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.ml.rest.job;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.BytesRestResponse;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskListener;
+import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
+import org.elasticsearch.xpack.core.ml.job.config.Job;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+import static org.elasticsearch.xpack.ml.MachineLearning.BASE_PATH;
+
+public class RestResetJobAction extends BaseRestHandler {
+
+    @Override
+    public List<Route> routes() {
+        return List.of(
+            new Route(POST, BASE_PATH + "anomaly_detectors/{" + Job.ID + "}/_reset")
+        );
+    }
+
+    @Override
+    public String getName() {
+        return "ml_reset_job_action";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+        ResetJobAction.Request request = new ResetJobAction.Request(restRequest.param(Job.ID.getPreferredName()));
+        request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
+        request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
+
+        if (restRequest.paramAsBoolean("wait_for_completion", true)) {
+            return channel -> client.execute(ResetJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
+        } else {
+            request.setShouldStoreResult(true);
+            Task task = client.executeLocally(ResetJobAction.INSTANCE, request, nullTaskListener());
+            return channel -> {
+                try (XContentBuilder builder = channel.newBuilder()) {
+                    builder.startObject();
+                    builder.field("task", client.getLocalNodeId() + ":" + task.getId());
+                    builder.endObject();
+                    channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
+                }
+            };
+        }
+    }
+
+    // We do not want to log anything due to a delete action
+    // The response or error will be returned to the client when called synchronously
+    // or it will be stored in the task result when called asynchronously
+    private static <T> TaskListener<T> nullTaskListener() {
+        return new TaskListener<T>() {
+            @Override
+            public void onResponse(Task task, T o) {}
+
+            @Override
+            public void onFailure(Task task, Exception e) {}
+        };
+    }
+}

+ 8 - 5
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobTests;
 
@@ -40,7 +41,9 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
         MlMetadata.Builder builder = new MlMetadata.Builder();
         int numJobs = randomIntBetween(0, 10);
         for (int i = 0; i < numJobs; i++) {
-            Job job = JobTests.createRandomizedJob();
+            Job.Builder job = new Job.Builder(JobTests.createRandomizedJob());
+            job.setDeleting(false);
+            job.setBlocked(Blocked.none());
             if (randomBoolean()) {
                 AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(job.getAnalysisConfig());
                 analysisConfig.setLatency(null);
@@ -49,11 +52,11 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
                 if (datafeedConfig.hasAggregations()) {
                     analysisConfig.setSummaryCountFieldName("doc_count");
                 }
-                job = new Job.Builder(job).setAnalysisConfig(analysisConfig).build();
-                builder.putJob(job, false);
+                job.setAnalysisConfig(analysisConfig).build();
+                builder.putJob(job.build(), false);
                 builder.putDatafeed(datafeedConfig, Collections.emptyMap(), xContentRegistry());
             } else {
-                builder.putJob(job, false);
+                builder.putJob(job.build(), false);
             }
         }
         return builder.isResetMode(randomBoolean()).isUpgradeMode(randomBoolean()).build();
@@ -181,7 +184,7 @@ public class MlMetadataTests extends AbstractSerializingTestCase<MlMetadata> {
             if (datafeedConfig.hasAggregations()) {
                 analysisConfig.setSummaryCountFieldName("doc_count");
             }
-            randomJob = new Job.Builder(randomJob).setAnalysisConfig(analysisConfig).build();
+            randomJob = new Job.Builder(randomJob).setAnalysisConfig(analysisConfig).setDeleting(false).setBlocked(Blocked.none()).build();
             metadataBuilder.putJob(randomJob, false);
             metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap(), xContentRegistry());
             break;

+ 10 - 1
x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutorTests.java

@@ -42,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
 import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
+import org.elasticsearch.xpack.core.ml.job.config.Blocked;
 import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
 import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
 import org.elasticsearch.xpack.core.ml.job.config.Detector;
@@ -115,7 +116,15 @@ public class OpenJobPersistentTasksExecutorTests extends ESTestCase {
         jobBuilder.setDeleting(true);
         Exception e = expectThrows(ElasticsearchStatusException.class,
             () -> validateJobAndId("job_id", jobBuilder.build()));
-        assertEquals("Cannot open job [job_id] because it is being deleted", e.getMessage());
+        assertEquals("Cannot open job [job_id] because it is executing [delete]", e.getMessage());
+    }
+
+    public void testValidate_blockedReset() {
+        Job.Builder jobBuilder = buildJobBuilder("job_id");
+        jobBuilder.setBlocked(new Blocked(Blocked.Reason.REVERT, null));
+        Exception e = expectThrows(ElasticsearchStatusException.class,
+            () -> validateJobAndId("job_id", jobBuilder.build()));
+        assertEquals("Cannot open job [job_id] because it is executing [revert]", e.getMessage());
     }
 
     public void testValidate_jobWithoutVersion() {

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -154,6 +154,7 @@ public class Constants {
         "cluster:admin/xpack/ml/job/open",
         "cluster:admin/xpack/ml/job/persist",
         "cluster:admin/xpack/ml/job/put",
+        "cluster:admin/xpack/ml/job/reset",
         "cluster:admin/xpack/ml/job/update",
         "cluster:admin/xpack/ml/job/validate",
         "cluster:admin/xpack/ml/job/validate/detector",

+ 47 - 0
x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/reset_job.yml

@@ -0,0 +1,47 @@
+setup:
+  - skip:
+      features: headers
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      ml.put_job:
+        job_id: reset-job
+        body:  >
+          {
+            "job_id":"reset-job",
+            "analysis_config" : {
+                "bucket_span": "1h",
+                "detectors" :[{"function":"count"}]
+            },
+            "data_description" : {
+                "field_delimiter":",",
+                "time_field":"time",
+                "time_format":"yyyy-MM-dd HH:mm:ssX"
+            }
+          }
+
+---
+"Test reset":
+  - do:
+      ml.reset_job:
+        job_id: reset-job
+  - match: { acknowledged: true }
+
+---
+"Test reset given unknown job id":
+  - do:
+      catch: missing
+      ml.reset_job:
+        job_id: not-a-job
+
+---
+"Test reset given job is open":
+  - do:
+      ml.open_job:
+        job_id: reset-job
+  - match: { opened: true }
+
+  - do:
+      catch: /Can only reset a job when it is closed/
+      ml.reset_job:
+        job_id: reset-job