Browse Source

[ML] Fix race condition when force stopping DF analytics job (#57680)

When we force delete a DF analytics job, we currently first force
stop it and then we proceed with deleting the job config.
This may result in logging errors if the job config is deleted
before it is retrieved while the job is starting.

Instead of force stopping the job, it would make more sense to
try to stop the job gracefully first. So we now try that out first.
If normal stop fails, then we resort to force stopping the job to
ensure we can go through with the delete.

In addition, this commit introduces `timeout` for the delete action
and makes use of it in the child requests.
Dimitris Athanasiou 5 years ago
parent
commit
e116ac850f

+ 3 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

@@ -712,6 +712,9 @@ final class MLRequestConverters {
         if (deleteRequest.getForce() != null) {
             params.putParam("force", Boolean.toString(deleteRequest.getForce()));
         }
+        if (deleteRequest.getTimeout() != null) {
+            params.withTimeout(deleteRequest.getTimeout());
+        }
         request.addParameters(params.asMap());
 
         return request;

+ 19 - 2
client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteDataFrameAnalyticsRequest.java

@@ -21,6 +21,7 @@ package org.elasticsearch.client.ml;
 
 import org.elasticsearch.client.Validatable;
 import org.elasticsearch.client.ValidationException;
+import org.elasticsearch.common.unit.TimeValue;
 
 import java.util.Objects;
 import java.util.Optional;
@@ -32,6 +33,7 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
 
     private final String id;
     private Boolean force;
+    private TimeValue timeout;
 
     public DeleteDataFrameAnalyticsRequest(String id) {
         this.id = id;
@@ -55,6 +57,19 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
         this.force = force;
     }
 
+    public TimeValue getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets the time to wait until the job is deleted.
+     *
+     * @param timeout The time to wait until the job is deleted.
+     */
+    public void setTimeout(TimeValue timeout) {
+        this.timeout = timeout;
+    }
+
     @Override
     public Optional<ValidationException> validate() {
         if (id == null) {
@@ -69,11 +84,13 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable {
         if (o == null || getClass() != o.getClass()) return false;
 
         DeleteDataFrameAnalyticsRequest other = (DeleteDataFrameAnalyticsRequest) o;
-        return Objects.equals(id, other.id) && Objects.equals(force, other.force);
+        return Objects.equals(id, other.id)
+            && Objects.equals(force, other.force)
+            && Objects.equals(timeout, other.timeout);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, force);
+        return Objects.hash(id, force, timeout);
     }
 }

+ 12 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java

@@ -816,9 +816,21 @@ public class MLRequestConvertersTests extends ESTestCase {
         assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
         assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
         assertNull(request.getEntity());
+        assertThat(request.getParameters().size(), equalTo(1));
         assertEquals(Boolean.toString(true), request.getParameters().get("force"));
     }
 
+    public void testDeleteDataFrameAnalytics_WithTimeout() {
+        DeleteDataFrameAnalyticsRequest deleteRequest = new DeleteDataFrameAnalyticsRequest(randomAlphaOfLength(10));
+        deleteRequest.setTimeout(TimeValue.timeValueSeconds(10));
+        Request request = MLRequestConverters.deleteDataFrameAnalytics(deleteRequest);
+        assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
+        assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint());
+        assertNull(request.getEntity());
+        assertThat(request.getParameters().size(), equalTo(1));
+        assertEquals(request.getParameters().get("timeout"), "10s");
+    }
+
     public void testEvaluateDataFrame() throws IOException {
         EvaluateDataFrameRequest evaluateRequest = EvaluateDataFrameRequestTests.createRandom();
         Request request = MLRequestConverters.evaluateDataFrame(evaluateRequest);

+ 3 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java

@@ -3090,9 +3090,10 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
             DeleteDataFrameAnalyticsRequest request = new DeleteDataFrameAnalyticsRequest("my-analytics-config"); // <1>
             // end::delete-data-frame-analytics-request
 
-            //tag::delete-data-frame-analytics-request-force
+            //tag::delete-data-frame-analytics-request-options
             request.setForce(false); // <1>
-            //end::delete-data-frame-analytics-request-force
+            request.setTimeout(TimeValue.timeValueMinutes(1)); // <2>
+            //end::delete-data-frame-analytics-request-options
 
             // tag::delete-data-frame-analytics-execute
             AcknowledgedResponse response = client.machineLearning().deleteDataFrameAnalytics(request, RequestOptions.DEFAULT);

+ 2 - 1
docs/java-rest/high-level/ml/delete-data-frame-analytics.asciidoc

@@ -27,10 +27,11 @@ The following arguments are optional:
 
 ["source","java",subs="attributes,callouts,macros"]
 ---------------------------------------------------
-include-tagged::{doc-tests-file}[{api}-request-force]
+include-tagged::{doc-tests-file}[{api}-request-options]
 ---------------------------------------------------
 <1> Use to forcefully delete a job that is not stopped. This method is quicker than stopping
 and deleting the job. Defaults to `false`.
+<2> Use to set the time to wait until the job is deleted. Defaults to 1 minute.
 
 include::../execution.asciidoc[]
 

+ 5 - 0
docs/reference/ml/df-analytics/apis/delete-dfanalytics.asciidoc

@@ -44,6 +44,11 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics]
   (Optional, boolean) If `true`, it deletes a job that is not stopped; this method is
   quicker than stopping and deleting the job.
 
+`timeout`::
+  (Optional, <<time-units,time units>>) The time to wait for the job to be deleted.
+  Defaults to 1 minute.
+
+
 
 [[ml-delete-dfanalytics-example]]
 ==== {api-examples-title}

+ 14 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java

@@ -15,11 +15,13 @@ import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
 import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
 
 import java.io.IOException;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedResponse> {
 
@@ -33,6 +35,10 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
     public static class Request extends AcknowledgedRequest<Request> {
 
         public static final ParseField FORCE = new ParseField("force");
+        public static final ParseField TIMEOUT = new ParseField("timeout");
+
+        // Default timeout matches that of delete by query
+        private static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
 
         private String id;
         private boolean force;
@@ -47,9 +53,12 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
             }
         }
 
-        public Request() {}
+        public Request() {
+            timeout(DEFAULT_TIMEOUT);
+        }
 
         public Request(String id) {
+            this();
             this.id = ExceptionsHelper.requireNonNull(id, DataFrameAnalyticsConfig.ID);
         }
 
@@ -75,7 +84,9 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
             if (this == o) return true;
             if (o == null || getClass() != o.getClass()) return false;
             DeleteDataFrameAnalyticsAction.Request request = (DeleteDataFrameAnalyticsAction.Request) o;
-            return Objects.equals(id, request.id) && force == request.force;
+            return Objects.equals(id, request.id)
+                && force == request.force
+                && Objects.equals(timeout, request.timeout);
         }
 
         @Override
@@ -89,7 +100,7 @@ public class DeleteDataFrameAnalyticsAction extends ActionType<AcknowledgedRespo
 
         @Override
         public int hashCode() {
-            return Objects.hash(id, force);
+            return Objects.hash(id, force, timeout);
         }
     }
 

+ 4 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsActionRequestTests.java

@@ -6,6 +6,7 @@
 package org.elasticsearch.xpack.core.ml.action;
 
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.test.AbstractWireSerializingTestCase;
 import org.elasticsearch.xpack.core.ml.action.DeleteDataFrameAnalyticsAction.Request;
 
@@ -18,6 +19,9 @@ public class DeleteDataFrameAnalyticsActionRequestTests extends AbstractWireSeri
     protected Request createTestInstance() {
         Request request = new Request(randomAlphaOfLength(10));
         request.setForce(randomBoolean());
+        if (randomBoolean()) {
+            request.timeout(TimeValue.parseTimeValue(randomTimeValue(), "test"));
+        }
         return request;
     }
 

+ 47 - 14
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDataFrameAnalyticsAction.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -26,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
@@ -103,33 +105,59 @@ public class TransportDeleteDataFrameAnalyticsAction
     @Override
     protected void masterOperation(Task task, DeleteDataFrameAnalyticsAction.Request request, ClusterState state,
                                    ActionListener<AcknowledgedResponse> listener) {
-        String id = request.getId();
         TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
         ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId);
 
         if (request.isForce()) {
-            forceDelete(parentTaskClient, id, listener);
+            forceDelete(parentTaskClient, request, listener);
         } else {
-            normalDelete(parentTaskClient, state, id, listener);
+            normalDelete(parentTaskClient, state, request, listener);
         }
     }
 
-    private void forceDelete(ParentTaskAssigningClient parentTaskClient, String id,
+    private void forceDelete(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request,
                              ActionListener<AcknowledgedResponse> listener) {
-        logger.debug("[{}] Force deleting data frame analytics job", id);
+        logger.debug("[{}] Force deleting data frame analytics job", request.getId());
 
         ActionListener<StopDataFrameAnalyticsAction.Response> stopListener = ActionListener.wrap(
-            stopResponse -> normalDelete(parentTaskClient, clusterService.state(), id, listener),
+            stopResponse -> normalDelete(parentTaskClient, clusterService.state(), request, listener),
             listener::onFailure
         );
 
-        StopDataFrameAnalyticsAction.Request request = new StopDataFrameAnalyticsAction.Request(id);
-        request.setForce(true);
-        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, request, stopListener);
+        stopJob(parentTaskClient, request, stopListener);
     }
 
-    private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state, String id,
-                              ActionListener<AcknowledgedResponse> listener) {
+    private void stopJob(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request,
+                         ActionListener<StopDataFrameAnalyticsAction.Response> listener) {
+        // We first try to stop the job normally. Normal stop returns after the job was stopped.
+        // If that fails then we proceed to force stopping which returns as soon as the persistent task is removed.
+        // If we just did force stopping, then there is a chance we proceed to delete the config while it's
+        // still used from the running task which results in logging errors.
+
+        StopDataFrameAnalyticsAction.Request stopRequest = new StopDataFrameAnalyticsAction.Request(request.getId());
+        stopRequest.setTimeout(request.timeout());
+
+        ActionListener<StopDataFrameAnalyticsAction.Response> normalStopListener = ActionListener.wrap(
+            listener::onResponse,
+            normalStopFailure -> {
+                stopRequest.setForce(true);
+                executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, ActionListener.wrap(
+                    listener::onResponse,
+                    forceStopFailure -> {
+                        logger.error(new ParameterizedMessage("[{}] Failed to stop normally", request.getId()), normalStopFailure);
+                        logger.error(new ParameterizedMessage("[{}] Failed to stop forcefully", request.getId()), forceStopFailure);
+                        listener.onFailure(forceStopFailure);
+                    }
+                ));
+            }
+        );
+
+        executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, normalStopListener);
+    }
+
+    private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state,
+                              DeleteDataFrameAnalyticsAction.Request request, ActionListener<AcknowledgedResponse> listener) {
+        String id = request.getId();
         PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
         DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(id, tasks);
         if (taskState != DataFrameAnalyticsState.STOPPED) {
@@ -172,14 +200,14 @@ public class TransportDeleteDataFrameAnalyticsAction
                         logger.warn("[{}] DBQ failure: {}", id, failure);
                     }
                 }
-                deleteStats(parentTaskClient, id, deleteStatsHandler);
+                deleteStats(parentTaskClient, id, request.timeout(), deleteStatsHandler);
             },
             listener::onFailure
         );
 
         // Step 2. Delete state
         ActionListener<DataFrameAnalyticsConfig> configListener = ActionListener.wrap(
-            config -> deleteState(parentTaskClient, config, deleteStateHandler),
+            config -> deleteState(parentTaskClient, config, request.timeout(), deleteStateHandler),
             listener::onFailure
         );
 
@@ -208,6 +236,7 @@ public class TransportDeleteDataFrameAnalyticsAction
 
     private void deleteState(ParentTaskAssigningClient parentTaskClient,
                              DataFrameAnalyticsConfig config,
+                             TimeValue timeout,
                              ActionListener<BulkByScrollResponse> listener) {
         List<String> ids = new ArrayList<>();
         ids.add(StoredProgress.documentId(config.getId()));
@@ -218,22 +247,25 @@ public class TransportDeleteDataFrameAnalyticsAction
             parentTaskClient,
             AnomalyDetectorsIndex.jobStateIndexPattern(),
             QueryBuilders.idsQuery().addIds(ids.toArray(String[]::new)),
+            timeout,
             listener
         );
     }
 
     private void deleteStats(ParentTaskAssigningClient parentTaskClient,
                              String jobId,
+                             TimeValue timeout,
                              ActionListener<BulkByScrollResponse> listener) {
         executeDeleteByQuery(
             parentTaskClient,
             MlStatsIndex.indexPattern(),
             QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId),
+            timeout,
             listener
         );
     }
 
-    private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query,
+    private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query, TimeValue timeout,
                                       ActionListener<BulkByScrollResponse> listener) {
         DeleteByQueryRequest request = new DeleteByQueryRequest(index);
         request.setQuery(query);
@@ -241,6 +273,7 @@ public class TransportDeleteDataFrameAnalyticsAction
         request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
         request.setAbortOnVersionConflict(false);
         request.setRefresh(true);
+        request.setTimeout(timeout);
         executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener);
     }
 

+ 1 - 0
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestDeleteDataFrameAnalyticsAction.java

@@ -37,6 +37,7 @@ public class RestDeleteDataFrameAnalyticsAction extends BaseRestHandler {
         String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName());
         DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id);
         request.setForce(restRequest.paramAsBoolean(DeleteDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce()));
+        request.timeout(restRequest.paramAsTime(DeleteDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), request.timeout()));
         return channel -> client.execute(DeleteDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 4 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_data_frame_analytics.json

@@ -26,6 +26,10 @@
         "type":"boolean",
         "description":"True if the job should be forcefully deleted",
         "default":false
+      },
+      "timeout":{
+        "type":"time",
+        "description":"Controls the time to wait until a job is deleted. Defaults to 1 minute"
       }
     }
   }