소스 검색

[Rollup] Add `wait_for_completion` option to StopRollupJob API (#34811)

This adds a `wait_for_completion` flag which allows the user to block 
the Stop API until the task has actually moved to a stopped state, 
instead of returning immediately.  If the flag is set, a `timeout` parameter
can be specified to determine how long (at max) to block the API
call.  If unspecified, the timeout is 30s.

If the timeout is exceeded before the job moves to STOPPED, a
timeout exception is thrown.  Note: this is just signifying that the API
call itself timed out.  The job will remain in STOPPING and evenutally
flip over to STOPPED in the background.

If the user asks the API to block, we move over the the generic
threadpool so that we don't hold up a networking thread.
Zachary Tong 7 년 전
부모
커밋
c346a0f027

+ 30 - 0
docs/reference/rollup/apis/stop-job.asciidoc

@@ -22,6 +22,18 @@ Stopping an already stopped job has no action.
 `job_id` (required)::
   (string) Identifier for the job
 
+==== Query Parameters
+
+`wait_for_completion` (optional)::
+  (boolean) if set to true, causes the API to block until the indexer state completely stops.  If set to false, the
+  API returns immediately and the indexer will be stopped asynchronously in the background.  Defaults to `false`.
+
+`timeout` (optional)::
+  (TimeValue) if `wait_for_completion=true`, the API will block for (at maximum)
+  the specified duration while waiting for the job to stop. If more than `timeout` time has passed, the API
+  will throw a timeout exception.  Note: even if a timeout exception is thrown, the stop request is still processing and
+  will eventually move the job to `STOPPED`.  The timeout simply means the API call itself timed out while waiting
+  for the status change.  Defaults to `30s`
 
 ==== Request Body
 
@@ -85,3 +97,21 @@ A 404 `resource_not_found` exception will be thrown:
 }
 ----
 // TESTRESPONSE[s/"stack_trace": .../"stack_trace": $body.$_path/]
+
+===== Waiting for the job to stop
+
+Since only a stopped job can be deleted, it can be useful to block the StopJob API until the indexer has fully
+stopped.  This is accomplished with the `wait_for_completion` query parameter, and optionally a `timeout`:
+
+
+[source,js]
+--------------------------------------------------
+POST _xpack/rollup/job/sensor/_stop?wait_for_completion=true&timeout=10s
+--------------------------------------------------
+// CONSOLE
+// TEST[setup:sensor_started_rollup_job]
+
+The parameter will block the API call from returning until either the job has moved to `STOPPED`, or the specified
+time has elapsed.  If the specified time elapses without the job moving to `STOPPED`, a timeout exception will be thrown.
+
+If `wait_for_completion=true` is specified without a `timeout`, a default timeout of 30 seconds is used.

+ 2 - 17
test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

@@ -55,7 +55,6 @@ import org.junit.AfterClass;
 import org.junit.Before;
 
 import javax.net.ssl.SSLContext;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -69,7 +68,6 @@ import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -491,25 +489,12 @@ public abstract class ESRestTestCase extends ESTestCase {
             String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");
             Request request = new Request("POST", "/_xpack/rollup/job/" + jobId + "/_stop");
             request.addParameter("ignore", "404");
+            request.addParameter("wait_for_completion", "true");
+            request.addParameter("timeout", "10s");
             logger.debug("stopping rollup job [{}]", jobId);
             adminClient().performRequest(request);
         }
 
-        // TODO this is temporary until StopJob API gains the ability to block until stopped
-        boolean stopped = awaitBusy(() -> {
-            Request request = new Request("GET", "/_xpack/rollup/job/_all");
-            try {
-                Response jobsResponse = adminClient().performRequest(request);
-                String body = EntityUtils.toString(jobsResponse.getEntity());
-                // If the body contains any of the non-stopped states, at least one job is not finished yet
-                return Arrays.stream(new String[]{"started", "aborting", "stopping", "indexing"}).noneMatch(body::contains);
-            } catch (IOException e) {
-                return false;
-            }
-        }, 10, TimeUnit.SECONDS);
-
-        assertTrue("Timed out waiting for rollup job(s) to stop", stopped);
-
         for (Map<String, Object> jobConfig : jobConfigs) {
             @SuppressWarnings("unchecked")
             String jobId = (String) ((Map<String, Object>) jobConfig.get("config")).get("id");

+ 43 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/StopRollupJobAction.java

@@ -5,15 +5,19 @@
  */
 package org.elasticsearch.xpack.core.rollup.action;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.Action;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.support.tasks.BaseTasksRequest;
 import org.elasticsearch.action.support.tasks.BaseTasksResponse;
 import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -23,11 +27,15 @@ import org.elasticsearch.xpack.core.rollup.RollupField;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
 
     public static final StopRollupJobAction INSTANCE = new StopRollupJobAction();
     public static final String NAME = "cluster:admin/xpack/rollup/stop";
+    public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
+    public static final ParseField TIMEOUT = new ParseField("timeout");
+    public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
 
     private StopRollupJobAction() {
         super(NAME);
@@ -40,9 +48,17 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
 
     public static class Request extends BaseTasksRequest<Request> implements ToXContent {
         private String id;
+        private boolean waitForCompletion = false;
+        private TimeValue timeout = null;
 
-        public Request(String id) {
+        public Request (String id) {
+            this(id, false, null);
+        }
+
+        public Request(String id, boolean waitForCompletion, @Nullable TimeValue timeout) {
             this.id = ExceptionsHelper.requireNonNull(id, RollupField.ID.getPreferredName());
+            this.timeout = timeout == null ? DEFAULT_TIMEOUT : timeout;
+            this.waitForCompletion = waitForCompletion;
         }
 
         public Request() {}
@@ -51,16 +67,34 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
             return id;
         }
 
+        public TimeValue timeout() {
+            return timeout;
+        }
+
+        public boolean waitForCompletion() {
+            return waitForCompletion;
+        }
+
         @Override
         public void readFrom(StreamInput in) throws IOException {
             super.readFrom(in);
             id = in.readString();
+            // TODO change this after backport
+            if (in.getVersion().onOrAfter(Version.CURRENT)) {
+                waitForCompletion = in.readBoolean();
+                timeout = in.readTimeValue();
+            }
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeString(id);
+            // TODO change this after backport
+            if (out.getVersion().onOrAfter(Version.CURRENT)) {
+                out.writeBoolean(waitForCompletion);
+                out.writeTimeValue(timeout);
+            }
         }
 
         @Override
@@ -71,12 +105,16 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.field(RollupField.ID.getPreferredName(), id);
+            builder.field(WAIT_FOR_COMPLETION.getPreferredName(), waitForCompletion);
+            if (timeout != null) {
+                builder.field(TIMEOUT.getPreferredName(), timeout);
+            }
             return builder;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(id);
+            return Objects.hash(id, waitForCompletion, timeout);
         }
 
         @Override
@@ -88,7 +126,9 @@ public class StopRollupJobAction extends Action<StopRollupJobAction.Response> {
                 return false;
             }
             Request other = (Request) obj;
-            return Objects.equals(id, other.id);
+            return Objects.equals(id, other.id)
+                && Objects.equals(waitForCompletion, other.waitForCompletion)
+                && Objects.equals(timeout, other.timeout);
         }
     }
 

+ 70 - 5
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportStopRollupAction.java

@@ -5,6 +5,7 @@
  */
 package org.elasticsearch.xpack.rollup.action;
 
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
@@ -14,24 +15,31 @@ import org.elasticsearch.action.support.tasks.TransportTasksAction;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
+import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
 import org.elasticsearch.xpack.rollup.job.RollupJobTask;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
 
 public class TransportStopRollupAction extends TransportTasksAction<RollupJobTask, StopRollupJobAction.Request,
-        StopRollupJobAction.Response, StopRollupJobAction.Response> {
+    StopRollupJobAction.Response, StopRollupJobAction.Response> {
 
+    private final ThreadPool threadPool;
 
     @Inject
-    public TransportStopRollupAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
+    public TransportStopRollupAction(TransportService transportService, ActionFilters actionFilters,
+                                     ClusterService clusterService, ThreadPool threadPool) {
         super(StopRollupJobAction.NAME, clusterService, transportService, actionFilters,
             StopRollupJobAction.Request::new, StopRollupJobAction.Response::new, ThreadPool.Names.SAME);
+        this.threadPool = threadPool;
     }
 
     @Override
@@ -45,17 +53,74 @@ public class TransportStopRollupAction extends TransportTasksAction<RollupJobTas
     }
 
     @Override
-    protected void taskOperation(StopRollupJobAction.Request request,
-                                 RollupJobTask jobTask,
+    protected void taskOperation(StopRollupJobAction.Request request, RollupJobTask jobTask,
                                  ActionListener<StopRollupJobAction.Response> listener) {
         if (jobTask.getConfig().getId().equals(request.getId())) {
-            jobTask.stop(listener);
+            jobTask.stop(maybeWrapWithBlocking(request, jobTask, listener, threadPool));
         } else {
             listener.onFailure(new RuntimeException("ID of rollup task [" + jobTask.getConfig().getId()
                     + "] does not match request's ID [" + request.getId() + "]"));
         }
     }
 
+    private static ActionListener<StopRollupJobAction.Response> maybeWrapWithBlocking(StopRollupJobAction.Request request,
+                                                                  RollupJobTask jobTask,
+                                                                  ActionListener<StopRollupJobAction.Response> listener,
+                                                                  ThreadPool threadPool) {
+        if (request.waitForCompletion()) {
+            return ActionListener.wrap(response -> {
+                if (response.isStopped()) {
+                    // The Task acknowledged that it is stopped/stopping... wait until the status actually
+                    // changes over before returning.  Switch over to Generic threadpool so
+                    // we don't block the network thread
+                    threadPool.generic().execute(() -> {
+                        try {
+                            boolean stopped = awaitBusy(() -> ((RollupJobStatus) jobTask.getStatus())
+                                .getIndexerState().equals(IndexerState.STOPPED), request.timeout());
+
+                            if (stopped) {
+                                // We have successfully confirmed a stop, send back the response
+                                listener.onResponse(response);
+                            } else {
+                                listener.onFailure(new ElasticsearchTimeoutException("Timed out after [" + request.timeout().getStringRep()
+                                    + "] while waiting for rollup job [" + request.getId() + "] to stop"));
+                            }
+                        } catch (InterruptedException e) {
+                            listener.onFailure(e);
+                        }
+                    });
+
+                } else {
+                    // Did not acknowledge stop, just return the response
+                    listener.onResponse(response);
+                }
+            }, listener::onFailure);
+        }
+        // No request to block, execute async
+        return listener;
+    }
+
+    /**
+     * Lifted from ESTestCase, must stay private and do not reuse!  This is temporary until
+     * the Rollup state refactor makes it unnecessary to await on a status change
+     */
+    private static boolean awaitBusy(BooleanSupplier breakSupplier, TimeValue maxWaitTime) throws InterruptedException {
+        long maxTimeInMillis = maxWaitTime.getMillis();
+        long timeInMillis = 1;
+        long sum = 0;
+        while (sum + timeInMillis < maxTimeInMillis) {
+            if (breakSupplier.getAsBoolean()) {
+                return true;
+            }
+            Thread.sleep(timeInMillis);
+            sum += timeInMillis;
+            timeInMillis = Math.min(1000L, timeInMillis * 2);
+        }
+        timeInMillis = maxTimeInMillis - sum;
+        Thread.sleep(Math.max(timeInMillis, 0));
+        return breakSupplier.getAsBoolean();
+    }
+
     @Override
     protected StopRollupJobAction.Response newResponse(StopRollupJobAction.Request request, List<StopRollupJobAction.Response> tasks,
                                                        List<TaskOperationFailure> taskOperationFailures,

+ 5 - 4
x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/rest/RestStopRollupJobAction.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.rollup.rest;
 
 import org.elasticsearch.client.node.NodeClient;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
@@ -15,8 +16,6 @@ import org.elasticsearch.xpack.core.rollup.RollupField;
 import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction;
 import org.elasticsearch.xpack.rollup.Rollup;
 
-import java.io.IOException;
-
 public class RestStopRollupJobAction extends BaseRestHandler {
 
     public RestStopRollupJobAction(Settings settings, RestController controller) {
@@ -25,9 +24,11 @@ public class RestStopRollupJobAction extends BaseRestHandler {
     }
 
     @Override
-    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
+    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
         String id = restRequest.param(RollupField.ID.getPreferredName());
-        StopRollupJobAction.Request request = new StopRollupJobAction.Request(id);
+        TimeValue timeout = restRequest.paramAsTime(StopRollupJobAction.TIMEOUT.getPreferredName(), StopRollupJobAction.DEFAULT_TIMEOUT);
+        boolean waitForCompletion = restRequest.paramAsBoolean(StopRollupJobAction.WAIT_FOR_COMPLETION.getPreferredName(), false);
+        StopRollupJobAction.Request request = new StopRollupJobAction.Request(id, waitForCompletion, timeout);
 
         return channel -> client.execute(StopRollupJobAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }

+ 12 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.rollup.stop_job.json

@@ -11,6 +11,18 @@
           "required": true,
           "description": "The ID of the job to stop"
         }
+      },
+      "params": {
+        "wait_for_completion": {
+          "type": "boolean",
+          "required": false,
+          "description": "True if the API should block until the job has fully stopped, false if should be executed async. Defaults to false."
+        },
+        "timeout": {
+          "type": "time",
+          "required": false,
+          "description": "Block for (at maximum) the specified duration while waiting for the job to stop.  Defaults to 30s."
+        }
       }
     }
   }

+ 43 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/rollup/stop_job.yml

@@ -84,3 +84,46 @@ setup:
 
   - is_true: stopped
 
+---
+"Test wait_for_completion default timeout":
+  - skip:
+      version: " - 6.6.0"
+      reason: wait_for_completion option was added in 6.6
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      xpack.rollup.start_job:
+        id: foo
+  - is_true: started
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      xpack.rollup.stop_job:
+        id: foo
+        wait_for_completion: true
+  - is_true: stopped
+
+---
+"Test wait_for_completion with custom timeout":
+  - skip:
+      version: " - 6.6.0"
+      reason: wait_for_completion option was added in 6.6
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      xpack.rollup.start_job:
+        id: foo
+  - is_true: started
+
+  - do:
+      headers:
+        Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
+      xpack.rollup.stop_job:
+        id: foo
+        wait_for_completion: true
+        timeout: "5s"
+  - is_true: stopped
+