1
0
Эх сурвалжийг харах

[ML] Implement force deleting a data frame analytics job (#50553)

Adds a `force` parameter to the delete data frame analytics
request. When `force` is `true`, the action force-stops the
jobs and then proceeds to the deletion. This can be used in
order to delete a non-stopped job with a single request.

Closes #48124
Dimitris Athanasiou 5 жил өмнө
parent
commit
af0ce426cc

+ 12 - 3
client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

@@ -29,8 +29,6 @@ import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.client.RequestConverters.EndpointBuilder;
 import org.elasticsearch.client.core.PageParams;
 import org.elasticsearch.client.ml.CloseJobRequest;
-import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
-import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
 import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
 import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
 import org.elasticsearch.client.ml.DeleteCalendarRequest;
@@ -41,7 +39,9 @@ import org.elasticsearch.client.ml.DeleteFilterRequest;
 import org.elasticsearch.client.ml.DeleteForecastRequest;
 import org.elasticsearch.client.ml.DeleteJobRequest;
 import org.elasticsearch.client.ml.DeleteModelSnapshotRequest;
+import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
 import org.elasticsearch.client.ml.EvaluateDataFrameRequest;
+import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
 import org.elasticsearch.client.ml.FindFileStructureRequest;
 import org.elasticsearch.client.ml.FlushJobRequest;
 import org.elasticsearch.client.ml.ForecastJobRequest;
@@ -692,7 +692,16 @@ final class MLRequestConverters {
             .addPathPartAsIs("_ml", "data_frame", "analytics")
             .addPathPart(deleteRequest.getId())
             .build();
-        return new Request(HttpDelete.METHOD_NAME, endpoint);
+
+        Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
+
+        RequestConverters.Params params = new RequestConverters.Params();
+        if (deleteRequest.getForce() != null) {
+            params.putParam("force", Boolean.toString(deleteRequest.getForce()));
+        }
+        request.addParameters(params.asMap());
+
+        return request;
     }
 
     static Request evaluateDataFrame(EvaluateDataFrameRequest evaluateRequest) throws IOException {

+ 17 - 2
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteDataFrameAnalyticsRequest.java

@@ -31,6 +31,7 @@ import java.util.Optional;
 public class DeleteDataFrameAnalyticsRequest implements Validatable {
 
     private final String id;
+    private Boolean force;
 
     public DeleteDataFrameAnalyticsRequest(String id) {
         this.id = id;
@@ -40,6 +41,20 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
         return id;
     }
 
+    public Boolean getForce() {
+        return force;
+    }
+
+    /**
+     * Used to forcefully delete an job that is not stopped.
+     * This method is quicker than stopping and deleting the job.
+     *
+     * @param force When {@code true} forcefully delete a non stopped job. Defaults to {@code false}
+     */
+    public void setForce(Boolean force) {
+        this.force = force;
+    }
+
     @Override
     public Optional<ValidationException> validate() {
         if (id == null) {
@@ -54,11 +69,11 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
         if (o == null || getClass() != o.getClass()) return false;
 
         DeleteDataFrameAnalyticsRequest other = (DeleteDataFrameAnalyticsRequest) o;
-        return Objects.equals(id, other.id);
+        return Objects.equals(id, other.id) && Objects.equals(force, other.force);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id);
+        return Objects.hash(id, force);
     }
 }

+ 11 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

@@ -25,8 +25,6 @@ import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.elasticsearch.client.core.PageParams;
 import org.elasticsearch.client.ml.CloseJobRequest;
-import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
-import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
 import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
 import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
 import org.elasticsearch.client.ml.DeleteCalendarRequest;
@@ -37,8 +35,10 @@ import org.elasticsearch.client.ml.DeleteFilterRequest;
 import org.elasticsearch.client.ml.DeleteForecastRequest;
 import org.elasticsearch.client.ml.DeleteJobRequest;
 import org.elasticsearch.client.ml.DeleteModelSnapshotRequest;
+import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
 import org.elasticsearch.client.ml.EvaluateDataFrameRequest;
 import org.elasticsearch.client.ml.EvaluateDataFrameRequestTests;
+import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
 import org.elasticsearch.client.ml.FindFileStructureRequest;
 import org.elasticsearch.client.ml.FindFileStructureRequestTests;
 import org.elasticsearch.client.ml.FlushJobRequest;
@@ -778,6 +778,15 @@ public class MLRequestConvertersTests extends ESTestCase {
         assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
         assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
         assertNull(request.getEntity());
+        assertThat(request.getParameters().isEmpty(), is(true));
+
+        deleteRequest = new DeleteDataFrameAnalyticsRequest(randomAlphaOfLength(10));
+        deleteRequest.setForce(true);
+        request = MLRequestConverters.deleteDataFrameAnalytics(deleteRequest);
+        assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
+        assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
+        assertNull(request.getEntity());
+        assertEquals(Boolean.toString(true), request.getParameters().get("force"));
     }
 
     public void testEvaluateDataFrame() throws IOException {

+ 5 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java

@@ -1616,8 +1616,11 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
             machineLearningClient::getDataFrameAnalytics, machineLearningClient::getDataFrameAnalyticsAsync);
         assertThat(getDataFrameAnalyticsResponse.getAnalytics(), hasSize(1));
 
-        AcknowledgedResponse deleteDataFrameAnalyticsResponse = execute(
-            new DeleteDataFrameAnalyticsRequest(configId),
+        DeleteDataFrameAnalyticsRequest deleteRequest = new DeleteDataFrameAnalyticsRequest(configId);
+        if (randomBoolean()) {
+            deleteRequest.setForce(randomBoolean());
+        }
+        AcknowledgedResponse deleteDataFrameAnalyticsResponse = execute(deleteRequest,
             machineLearningClient::deleteDataFrameAnalytics, machineLearningClient::deleteDataFrameAnalyticsAsync);
         assertTrue(deleteDataFrameAnalyticsResponse.isAcknowledged());
 

+ 7 - 3
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

@@ -36,9 +36,6 @@ import org.elasticsearch.client.core.PageParams;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.ml.CloseJobRequest;
 import org.elasticsearch.client.ml.CloseJobResponse;
-import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
-import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
-import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsResponse;
 import org.elasticsearch.client.ml.DeleteCalendarEventRequest;
 import org.elasticsearch.client.ml.DeleteCalendarJobRequest;
 import org.elasticsearch.client.ml.DeleteCalendarRequest;
@@ -51,8 +48,11 @@ import org.elasticsearch.client.ml.DeleteForecastRequest;
 import org.elasticsearch.client.ml.DeleteJobRequest;
 import org.elasticsearch.client.ml.DeleteJobResponse;
 import org.elasticsearch.client.ml.DeleteModelSnapshotRequest;
+import org.elasticsearch.client.ml.DeleteTrainedModelRequest;
 import org.elasticsearch.client.ml.EvaluateDataFrameRequest;
 import org.elasticsearch.client.ml.EvaluateDataFrameResponse;
+import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsRequest;
+import org.elasticsearch.client.ml.ExplainDataFrameAnalyticsResponse;
 import org.elasticsearch.client.ml.FindFileStructureRequest;
 import org.elasticsearch.client.ml.FindFileStructureResponse;
 import org.elasticsearch.client.ml.FlushJobRequest;
@@ -3065,6 +3065,10 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
             DeleteDataFrameAnalyticsRequest request = new DeleteDataFrameAnalyticsRequest("my-analytics-config"); // <1>
             // end::delete-data-frame-analytics-request
 
+            //tag::delete-data-frame-analytics-request-force
+            request.setForce(false); // <1>
+            //end::delete-data-frame-analytics-request-force
+
             // tag::delete-data-frame-analytics-execute
             AcknowledgedResponse response = client.machineLearning().deleteDataFrameAnalytics(request, RequestOptions.DEFAULT);
             // end::delete-data-frame-analytics-execute

+ 11 - 0
docs/java-rest/high-level/ml/delete-data-frame-analytics.asciidoc

@@ -21,6 +21,17 @@ include-tagged::{doc-tests-file}[{api}-request]
 ---------------------------------------------------
 <1> Constructing a new request referencing an existing {dfanalytics-job}.
 
+==== Optional arguments
+
+The following arguments are optional:
+
+["source","java",subs="attributes,callouts,macros"]
+---------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request-force]
+---------------------------------------------------
+<1> Use to forcefully delete a job that is not stopped. This method is quicker than stopping
+and deleting the job. Defaults to `false`.
+
 include::../execution.asciidoc[]
 
 [id="{upid}-{api}-response"]

+ 8 - 1
docs/reference/ml/df-analytics/apis/delete-dfanalytics.asciidoc

@@ -21,7 +21,7 @@ experimental[]
 [[ml-delete-dfanalytics-prereq]]
 ==== {api-prereq-title}
 
-* You must have `machine_learning_admin` built-in role to use this API. For more 
+* You must have `machine_learning_admin` built-in role to use this API. For more
 information, see <<security-privileges>> and <<built-in-roles>>.
 
 
@@ -32,6 +32,13 @@ information, see <<security-privileges>> and <<built-in-roles>>.
 (Required, string)
 include::{docdir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics]
 
+[[ml-delete-dfanalytics-query-params]]
+==== {api-query-parms-title}
+
+`force`::
+  (Optional, boolean) If `true`, it deletes a job that is not stopped; this method is
+  quicker than stopping and deleting the job.
+
 
 [[ml-delete-dfanalytics-example]]
 ==== {api-examples-title}

+ 24 - 11
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java

@@ -5,16 +5,16 @@
  */
 package org.elasticsearch.xpack.core.ml.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
 import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.xcontent.ToXContentFragment;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 
@@ -30,13 +30,21 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
         super(NAME, AcknowledgedResponse::new);
     }
 
-    public static class Request extends AcknowledgedRequest<Request> implements ToXContentFragment {
+    public static class Request extends AcknowledgedRequest<Request> {
+
+        public static final ParseField FORCE = new ParseField("force");
 
         private String id;
+        private boolean force;
 
         public Request(StreamInput in) throws IOException {
             super(in);
             id = in.readString();
+            if (in.getVersion().onOrAfter(Version.CURRENT)) {
+                force = in.readBoolean();
+            } else {
+                force = false;
+            }
         }
 
         public Request() {}
@@ -49,15 +57,17 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
             return id;
         }
 
-        @Override
-        public ActionRequestValidationException validate() {
-            return null;
+        public boolean isForce() {
+            return force;
+        }
+
+        public void setForce(boolean force) {
+            this.force = force;
         }
 
         @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.field(DataFrameAnalyticsConfig.ID.getPreferredName(), id);
-            return builder;
+        public ActionRequestValidationException validate() {
+            return null;
         }
 
         @Override
@@ -65,18 +75,21 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             DeleteDataFrameAnalyticsAction.Request request = (DeleteDataFrameAnalyticsAction.Request) o;
-            return Objects.equals(id, request.id);
+            return Objects.equals(id, request.id) && force == request.force;
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(id);
+            if (out.getVersion().onOrAfter(Version.CURRENT)) {
+                out.writeBoolean(force);
+            }
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(id);
+            return Objects.hash(id, force);
         }
     }
 

+ 0 - 9
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/KillProcessAction.java

@@ -6,9 +6,7 @@
 package org.elasticsearch.xpack.core.ml.action;
 
 import org.elasticsearch.action.ActionType;
-import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.support.tasks.BaseTasksResponse;
-import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -25,13 +23,6 @@ public class KillProcessAction extends ActionType<KillProcessAction.Response> {
         super(NAME, KillProcessAction.Response::new);
     }
 
-    static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
-
-        RequestBuilder(ElasticsearchClient client, KillProcessAction action) {
-            super(client, action, new Request());
-        }
-    }
-
     public static class Request extends JobTaskRequest<Request> {
 
         public Request(String jobId) {

+ 27 - 3
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java

@@ -38,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ml.MlTasks;
 import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction;
+import org.elasticsearch.xpack.core.ml.action.StopDataFrameAnalyticsAction;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
 import org.elasticsearch.xpack.core.ml.job.messages.Messages;
@@ -100,6 +101,32 @@ public class TransportDeleteDataFrameAnalyticsAction
     protected void masterOperation(Task task, DeleteDataFrameAnalyticsAction.Request request, ClusterState state,
                                    ActionListener<AcknowledgedResponse> listener) {
         String id = request.getId();
+        TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
+        ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
+
+        if (request.isForce()) {
+            forceDelete(parentTaskClient, id, listener);
+        } else {
+            normalDelete(parentTaskClient, state, id, listener);
+        }
+    }
+
+    private void forceDelete(ParentTaskAssigningClient parentTaskClient, String id,
+                             ActionListener<AcknowledgedResponse> listener) {
+        logger.debug("[{}] Force deleting data frame analytics job", id);
+
+        ActionListener<StopDataFrameAnalyticsAction.Response> stopListener = ActionListener.wrap(
+            stopResponse -> normalDelete(parentTaskClient, clusterService.state(), id, listener),
+            listener::onFailure
+        );
+
+        StopDataFrameAnalyticsAction.Request request = new StopDataFrameAnalyticsAction.Request(id);
+        request.setForce(true);
+        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, request, stopListener);
+    }
+
+    private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state, String id,
+                              ActionListener<AcknowledgedResponse> listener) {
         PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
         DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(id, tasks);
         if (taskState != DataFrameAnalyticsState.STOPPED) {
@@ -108,9 +135,6 @@ public class TransportDeleteDataFrameAnalyticsAction
             return;
         }
 
-        TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
-        ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
-
         // We clean up the memory tracker on delete because there is no stop; the task stops by itself
         memoryTracker.removeDataFrameAnalyticsJob(id);
 

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestDeleteDataFrameAnalyticsAction.java

@@ -32,6 +32,7 @@ public class RestDeleteDataFrameAnalyticsAction extends BaseRestHandler {
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
         String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
         DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id);
+        request.setForce(restRequest.paramAsBoolean(DeleteDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce()));
         return channel -> client.execute(DeleteDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 7 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_data_frame_analytics.json

@@ -19,6 +19,13 @@
           }
         }
       ]
+    },
+    "params":{
+      "force":{
+        "type":"boolean",
+        "description":"True if the job should be forcefully deleted",
+        "default":false
+      }
     }
   }
 }