Просмотр исходного кода

[reindex] Dynamic throttle!

This allows the user to update the reindex throttle on the fly, with changes
that speed up the throttling being applied immediately and changes that
slow down the throttling being applied during the next batch. This means
that if a user throttles reindex in such a way that it tries to sleep for
16 years and then realizes that they've done something wrong then they
can change the throttle and reindex will wake up again. We don't apply
slow downs immediately so we never get in danger of losing the scan context.

Also, if reindex is canceled while it is sleeping (how it honor throttling)
then it'll immediately wake up and cancel itself.
Nik Everett 9 лет назад
Родитель
Сommit
78ab6c5b7f
24 измененных файлов с 1046 добавлено и 75 удалено
  1. 7 0
      core/src/main/java/org/elasticsearch/tasks/CancellableTask.java
  2. 52 42
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java
  3. 23 1
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java
  4. 1 1
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java
  5. 173 8
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java
  6. 2 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java
  7. 56 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestRethrottleAction.java
  8. 43 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleAction.java
  9. 68 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequest.java
  10. 43 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequestBuilder.java
  11. 70 0
      modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java
  12. 1 1
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java
  13. 26 10
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java
  14. 98 9
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java
  15. 1 1
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTestUtils.java
  16. 5 1
      modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java
  17. 1 0
      modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml
  18. 12 0
      modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yaml
  19. 174 0
      modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_throttle.yaml
  20. 1 0
      modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/10_basic.yaml
  21. 14 0
      modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yaml
  22. 150 0
      modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_throttle.yaml
  23. 24 0
      rest-api-spec/src/main/resources/rest-api-spec/api/reindex.rethrottle.json
  24. 1 1
      rest-api-spec/src/main/resources/rest-api-spec/api/tasks.cancel.json

+ 7 - 0
core/src/main/java/org/elasticsearch/tasks/CancellableTask.java

@@ -44,6 +44,7 @@ public class CancellableTask extends Task {
     final void cancel(String reason) {
         assert reason != null;
         this.reason.compareAndSet(null, reason);
+        onCancelled();
     }
 
     /**
@@ -65,4 +66,10 @@ public class CancellableTask extends Task {
     public String getReasonCancelled() {
         return reason.get();
     }
+
+    /**
+     * Called after the task is cancelled so that it can take any actions that it has to take.
+     */
+    protected void onCancelled() {
+    }
 }

+ 52 - 42
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

@@ -70,8 +70,8 @@ import static org.elasticsearch.rest.RestStatus.CONFLICT;
 import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
 
 /**
- * Abstract base for scrolling across a search and executing bulk actions on all
- * results. All package private methods are package private so their tests can use them.
+ * Abstract base for scrolling across a search and executing bulk actions on all results. All package private methods are package private so
+ * their tests can use them. Most methods run in the listener thread pool because the are meant to be fast and don't expect to block.
  */
 public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>, Response> {
     /**
@@ -173,52 +173,62 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
             total = min(total, mainRequest.getSize());
         }
         task.setTotal(total);
-        task.countThrottle(delay);
-        threadPool.schedule(delay, ThreadPool.Names.GENERIC, threadPool.getThreadContext().preserveContext(new AbstractRunnable() {
+        AbstractRunnable prepareBulkRequestRunnable = new AbstractRunnable() {
             @Override
             protected void doRun() throws Exception {
-                if (task.isCancelled()) {
-                    finishHim(null);
-                    return;
-                }
-                lastBatchStartTime.set(System.nanoTime());
-                SearchHit[] docs = searchResponse.getHits().getHits();
-                logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
-                if (docs.length == 0) {
-                    startNormalTermination(emptyList(), emptyList(), false);
-                    return;
-                }
-                task.countBatch();
-                List<SearchHit> docsIterable = Arrays.asList(docs);
-                if (mainRequest.getSize() != SIZE_ALL_MATCHES) {
-                    // Truncate the docs if we have more than the request size
-                    long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed());
-                    if (remaining < docs.length) {
-                        docsIterable = docsIterable.subList(0, (int) remaining);
-                    }
-                }
-                BulkRequest request = buildBulk(docsIterable);
-                if (request.requests().isEmpty()) {
-                    /*
-                     * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
-                     */
-                    startNextScroll(0);
-                    return;
-                }
-                request.timeout(mainRequest.getTimeout());
-                request.consistencyLevel(mainRequest.getConsistency());
-                if (logger.isDebugEnabled()) {
-                    logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
-                            new ByteSizeValue(request.estimatedSizeInBytes()));
-                }
-                sendBulkRequest(request);
+                prepareBulkRequest(searchResponse);
             }
 
             @Override
             public void onFailure(Throwable t) {
                 finishHim(t);
             }
-        }));
+        };
+        prepareBulkRequestRunnable = (AbstractRunnable) threadPool.getThreadContext().preserveContext(prepareBulkRequestRunnable);
+        task.delayPrepareBulkRequest(threadPool, delay, prepareBulkRequestRunnable);
+    }
+
+    /**
+     * Prepare the bulk request. Called on the generic thread pool after some preflight checks have been done one the SearchResponse and any
+     * delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
+     * thread may be blocked by the user script.
+     */
+    void prepareBulkRequest(SearchResponse searchResponse) {
+        if (task.isCancelled()) {
+            finishHim(null);
+            return;
+        }
+        lastBatchStartTime.set(System.nanoTime());
+        SearchHit[] docs = searchResponse.getHits().getHits();
+        logger.debug("scroll returned [{}] documents with a scroll id of [{}]", docs.length, searchResponse.getScrollId());
+        if (docs.length == 0) {
+            startNormalTermination(emptyList(), emptyList(), false);
+            return;
+        }
+        task.countBatch();
+        List<SearchHit> docsIterable = Arrays.asList(docs);
+        if (mainRequest.getSize() != SIZE_ALL_MATCHES) {
+            // Truncate the docs if we have more than the request size
+            long remaining = max(0, mainRequest.getSize() - task.getSuccessfullyProcessed());
+            if (remaining < docs.length) {
+                docsIterable = docsIterable.subList(0, (int) remaining);
+            }
+        }
+        BulkRequest request = buildBulk(docsIterable);
+        if (request.requests().isEmpty()) {
+            /*
+             * If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
+             */
+            startNextScroll(0);
+            return;
+        }
+        request.timeout(mainRequest.getTimeout());
+        request.consistencyLevel(mainRequest.getConsistency());
+        if (logger.isDebugEnabled()) {
+            logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(),
+                    new ByteSizeValue(request.estimatedSizeInBytes()));
+        }
+        sendBulkRequest(request);
     }
 
     /**
@@ -329,13 +339,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
      * How many nanoseconds should a batch of lastBatchSize have taken if it were perfectly throttled? Package private for testing.
      */
     float perfectlyThrottledBatchTime(int lastBatchSize) {
-        if (mainRequest.getRequestsPerSecond() == 0) {
+        if (task.getRequestsPerSecond() == 0) {
             return 0;
         }
         //       requests
         // ------------------- == seconds
         // request per seconds
-        float targetBatchTimeInSeconds = lastBatchSize / mainRequest.getRequestsPerSecond();
+        float targetBatchTimeInSeconds = lastBatchSize / task.getRequestsPerSecond();
         // nanoseconds per seconds * seconds == nanoseconds
         return TimeUnit.SECONDS.toNanos(1) * targetBatchTimeInSeconds;
     }

+ 23 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java

@@ -43,6 +43,24 @@ public abstract class AbstractBaseReindexRestHandler<
                 Response extends BulkIndexByScrollResponse,
                 TA extends TransportAction<Request, Response>
             > extends BaseRestHandler {
+
+    /**
+     * @return requests_per_second from the request as a float if it was on the request, null otherwise
+     */
+    public static Float parseRequestsPerSecond(RestRequest request) {
+        String requestsPerSecond = request.param("requests_per_second");
+        if (requestsPerSecond == null) {
+            return null;
+        }
+        if ("".equals(requestsPerSecond)) {
+            throw new IllegalArgumentException("requests_per_second cannot be an empty string");
+        }
+        if ("unlimited".equals(requestsPerSecond)) {
+            return 0f;
+        }
+        return Float.parseFloat(requestsPerSecond);
+    }
+
     protected final IndicesQueriesRegistry indicesQueriesRegistry;
     protected final AggregatorParsers aggParsers;
     protected final Suggesters suggesters;
@@ -61,7 +79,11 @@ public abstract class AbstractBaseReindexRestHandler<
     }
 
     protected void execute(RestRequest request, Request internalRequest, RestChannel channel) throws IOException {
-        internalRequest.setRequestsPerSecond(request.paramAsFloat("requests_per_second", internalRequest.getRequestsPerSecond()));
+        Float requestsPerSecond = parseRequestsPerSecond(request);
+        if (requestsPerSecond != null) {
+            internalRequest.setRequestsPerSecond(requestsPerSecond);
+        }
+
         if (request.paramAsBoolean("wait_for_completion", true)) {
             action.execute(internalRequest, new BulkIndexByScrollResponseContentListener<Response>(channel));
             return;

+ 1 - 1
modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java

@@ -276,7 +276,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
 
     @Override
     public Task createTask(long id, String type, String action) {
-        return new BulkByScrollTask(id, type, action, getDescription());
+        return new BulkByScrollTask(id, type, action, getDescription(), requestsPerSecond);
     }
 
     @Override

+ 173 - 8
modules/reindex/src/main/java/org/elasticsearch/index/reindex/BulkByScrollTask.java

@@ -23,14 +23,22 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
+import static java.lang.Math.round;
 import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
 
 /**
@@ -50,15 +58,42 @@ public class BulkByScrollTask extends CancellableTask {
     private final AtomicLong versionConflicts = new AtomicLong(0);
     private final AtomicLong retries = new AtomicLong(0);
     private final AtomicLong throttledNanos = new AtomicLong();
+    /**
+     * The number of requests per second to which to throttle the request that this task represents. The other variables are all AtomicXXX
+     * style variables but there isn't an AtomicFloat so we just use a volatile.
+     */
+    private volatile float requestsPerSecond;
+    /**
+     * Reference to any the last delayed prepareBulkRequest call. Used during rethrottling and canceling to reschedule the request.
+     */
+    private final AtomicReference<DelayedPrepareBulkRequest> delayedPrepareBulkRequestReference = new AtomicReference<>();
 
-    public BulkByScrollTask(long id, String type, String action, String description) {
+    public BulkByScrollTask(long id, String type, String action, String description, float requestsPerSecond) {
         super(id, type, action, description);
+        setRequestsPerSecond(requestsPerSecond);
+    }
+
+    @Override
+    protected void onCancelled() {
+        // Drop the throttle to 0, immediately rescheduling all outstanding tasks so the task will wake up and cancel itself.
+        rethrottle(0);
     }
 
     @Override
     public Status getStatus() {
         return new Status(total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(), noops.get(),
-                retries.get(), timeValueNanos(throttledNanos.get()), getReasonCancelled());
+                retries.get(), timeValueNanos(throttledNanos.get()), getRequestsPerSecond(), getReasonCancelled(), throttledUntil());
+    }
+
+    private TimeValue throttledUntil() {
+        DelayedPrepareBulkRequest delayed = delayedPrepareBulkRequestReference.get();
+        if (delayed == null) {
+            return timeValueNanos(0);
+        }
+        if (delayed.future == null) {
+            return timeValueNanos(0);
+        }
+        return timeValueNanos(delayed.future.getDelay(TimeUnit.NANOSECONDS));
     }
 
     /**
@@ -70,6 +105,7 @@ public class BulkByScrollTask extends CancellableTask {
 
     public static class Status implements Task.Status {
         public static final String NAME = "bulk-by-scroll";
+
         private final long total;
         private final long updated;
         private final long created;
@@ -79,10 +115,12 @@ public class BulkByScrollTask extends CancellableTask {
         private final long noops;
         private final long retries;
         private final TimeValue throttled;
+        private final float requestsPerSecond;
         private final String reasonCancelled;
+        private final TimeValue throttledUntil;
 
         public Status(long total, long updated, long created, long deleted, int batches, long versionConflicts, long noops, long retries,
-                TimeValue throttled, @Nullable String reasonCancelled) {
+                TimeValue throttled, float requestsPerSecond, @Nullable String reasonCancelled, TimeValue throttledUntil) {
             this.total = checkPositive(total, "total");
             this.updated = checkPositive(updated, "updated");
             this.created = checkPositive(created, "created");
@@ -92,7 +130,9 @@ public class BulkByScrollTask extends CancellableTask {
             this.noops = checkPositive(noops, "noops");
             this.retries = checkPositive(retries, "retries");
             this.throttled = throttled;
+            this.requestsPerSecond = requestsPerSecond;
             this.reasonCancelled = reasonCancelled;
+            this.throttledUntil = throttledUntil;
         }
 
         public Status(StreamInput in) throws IOException {
@@ -105,7 +145,9 @@ public class BulkByScrollTask extends CancellableTask {
             noops = in.readVLong();
             retries = in.readVLong();
             throttled = TimeValue.readTimeValue(in);
+            requestsPerSecond = in.readFloat();
             reasonCancelled = in.readOptionalString();
+            throttledUntil = TimeValue.readTimeValue(in);
         }
 
         @Override
@@ -119,7 +161,9 @@ public class BulkByScrollTask extends CancellableTask {
             out.writeVLong(noops);
             out.writeVLong(retries);
             throttled.writeTo(out);
+            out.writeFloat(requestsPerSecond);
             out.writeOptionalString(reasonCancelled);
+            throttledUntil.writeTo(out);
         }
 
         @Override
@@ -144,9 +188,11 @@ public class BulkByScrollTask extends CancellableTask {
             builder.field("noops", noops);
             builder.field("retries", retries);
             builder.timeValueField("throttled_millis", "throttled", throttled);
+            builder.field("requests_per_second", requestsPerSecond == 0 ? "unlimited" : requestsPerSecond);
             if (reasonCancelled != null) {
                 builder.field("canceled", reasonCancelled);
             }
+            builder.timeValueField("throttled_until_millis", "throttled_until", throttledUntil);
             return builder;
         }
 
@@ -173,6 +219,7 @@ public class BulkByScrollTask extends CancellableTask {
             if (reasonCancelled != null) {
                 builder.append(",canceled=").append(reasonCancelled);
             }
+            builder.append(",throttledUntil=").append(throttledUntil);
         }
 
         @Override
@@ -238,12 +285,19 @@ public class BulkByScrollTask extends CancellableTask {
         }
 
         /**
-         * The total time this request has throttled itself.
+         * The total time this request has throttled itself not including the current throttle time if it is currently sleeping.
          */
         public TimeValue getThrottled() {
             return throttled;
         }
 
+        /**
+         * The number of requests per second to which to throttle the request. 0 means unlimited.
+         */
+        public float getRequestsPerSecond() {
+            return requestsPerSecond;
+        }
+
         /**
          * The reason that the request was canceled or null if it hasn't been.
          */
@@ -251,6 +305,13 @@ public class BulkByScrollTask extends CancellableTask {
             return reasonCancelled;
         }
 
+        /**
+         * Remaining delay of any current throttle sleep or 0 if not sleeping.
+         */
+        public TimeValue getThrottledUntil() {
+            return throttledUntil;
+        }
+
         private int checkPositive(int value, String name) {
             if (value < 0) {
                 throw new IllegalArgumentException(name + " must be greater than 0 but was [" + value + "]");
@@ -298,10 +359,114 @@ public class BulkByScrollTask extends CancellableTask {
         retries.incrementAndGet();
     }
 
-    public void countThrottle(TimeValue delay) {
-        long nanos = delay.nanos();
-        if (nanos > 0) {
-            throttledNanos.addAndGet(nanos);
+    float getRequestsPerSecond() {
+        return requestsPerSecond;
+    }
+
+    /**
+     * Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
+     * rescheduled over and over again.
+     */
+    void delayPrepareBulkRequest(ThreadPool threadPool, TimeValue delay, AbstractRunnable prepareBulkRequestRunnable) {
+        // Synchronize so we are less likely to schedule the same request twice.
+        synchronized (delayedPrepareBulkRequestReference) {
+            AbstractRunnable oneTime = new AbstractRunnable() {
+                private final AtomicBoolean hasRun = new AtomicBoolean(false);
+
+                @Override
+                protected void doRun() throws Exception {
+                    // Paranoia to prevent furiously rethrottling from running the command multiple times. Without this we totally can.
+                    if (hasRun.compareAndSet(false, true)) {
+                        prepareBulkRequestRunnable.run();
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    prepareBulkRequestRunnable.onFailure(t);
+                }
+            };
+            delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(), delay, oneTime));
+        }
+    }
+
+    private void setRequestsPerSecond(float requestsPerSecond) {
+        if (requestsPerSecond == -1) {
+            requestsPerSecond = 0;
+        }
+        this.requestsPerSecond = requestsPerSecond;
+    }
+
+    void rethrottle(float newRequestsPerSecond) {
+        synchronized (delayedPrepareBulkRequestReference) {
+            setRequestsPerSecond(newRequestsPerSecond);
+
+            DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get();
+            if (delayedPrepareBulkRequest == null) {
+                // No request has been queued yet so nothing to reschedule.
+                return;
+            }
+
+            this.delayedPrepareBulkRequestReference.set(delayedPrepareBulkRequest.rethrottle(newRequestsPerSecond));
+        }
+    }
+
+    class DelayedPrepareBulkRequest {
+        private final ThreadPool threadPool;
+        private final AbstractRunnable command;
+        private final float requestsPerSecond;
+        private final ScheduledFuture<?> future;
+
+        DelayedPrepareBulkRequest(ThreadPool threadPool, float requestsPerSecond, TimeValue delay, AbstractRunnable command) {
+            this.threadPool = threadPool;
+            this.requestsPerSecond = requestsPerSecond;
+            this.command = command;
+            this.future = threadPool.schedule(delay, ThreadPool.Names.GENERIC, new AbstractRunnable() {
+                @Override
+                protected void doRun() throws Exception {
+                    throttledNanos.addAndGet(delay.nanos());
+                    command.run();
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    command.onFailure(t);
+                }
+            });
+        }
+
+        DelayedPrepareBulkRequest rethrottle(float newRequestsPerSecond) {
+            if (newRequestsPerSecond != 0 && newRequestsPerSecond < requestsPerSecond) {
+                /*
+                 * The user is attempting to slow the request down. We'll let the change in throttle take effect the next time we delay
+                 * prepareBulkRequest. We can't just reschedule the request further out in the future the bulk context might time out.
+                 */
+                return this;
+            }
+
+            long remainingDelay = future.getDelay(TimeUnit.NANOSECONDS);
+            // Actually reschedule the task
+            if (false == FutureUtils.cancel(future)) {
+                // Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here.
+                return this;
+            }
+
+            /*
+             * Strangely enough getting here doesn't mean that you actually cancelled the request, just that you probably did. If you stress
+             * test it you'll find that requests sneak through. So each request is given a runOnce boolean to prevent that.
+             */
+            TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond);
+            return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command);
+        }
+
+        /**
+         * Scale back remaining delay to fit the new delay.
+         */
+        TimeValue newDelay(long remainingDelay, float newRequestsPerSecond) {
+            if (remainingDelay < 0 || newRequestsPerSecond == 0) {
+                return timeValueNanos(0);
+            }
+            return timeValueNanos(round(remainingDelay * requestsPerSecond / newRequestsPerSecond));
         }
     }
 }

+ 2 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java

@@ -39,11 +39,13 @@ public class ReindexPlugin extends Plugin {
     public void onModule(ActionModule actionModule) {
         actionModule.registerAction(ReindexAction.INSTANCE, TransportReindexAction.class);
         actionModule.registerAction(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class);
+        actionModule.registerAction(RethrottleAction.INSTANCE, TransportRethrottleAction.class);
     }
 
     public void onModule(NetworkModule networkModule) {
         networkModule.registerRestHandler(RestReindexAction.class);
         networkModule.registerRestHandler(RestUpdateByQueryAction.class);
+        networkModule.registerRestHandler(RestRethrottleAction.class);
         networkModule.registerTaskStatus(BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new);
     }
 }

+ 56 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RestRethrottleAction.java

@@ -0,0 +1,56 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestChannel;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.support.RestToXContentListener;
+import org.elasticsearch.tasks.TaskId;
+
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+
+public class RestRethrottleAction extends BaseRestHandler {
+    private final TransportRethrottleAction action;
+
+    @Inject
+    public RestRethrottleAction(Settings settings, RestController controller, Client client, TransportRethrottleAction action) {
+        super(settings, client);
+        this.action = action;
+        controller.registerHandler(POST, "/_update_by_query/{taskId}/_rethrottle", this);
+        controller.registerHandler(POST, "/_reindex/{taskId}/_rethrottle", this);
+    }
+
+    @Override
+    public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
+        RethrottleRequest internalRequest = new RethrottleRequest();
+        internalRequest.setTaskId(new TaskId(request.param("taskId")));
+        Float requestsPerSecond = AbstractBaseReindexRestHandler.parseRequestsPerSecond(request);
+        if (requestsPerSecond == null) {
+            throw new IllegalArgumentException("requests_per_second is a required parameter");
+        }
+        internalRequest.setRequestsPerSecond(requestsPerSecond);
+        action.execute(internalRequest, new RestToXContentListener<>(channel));
+    }
+}

+ 43 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleAction.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
+import org.elasticsearch.client.ElasticsearchClient;
+
+public class RethrottleAction extends Action<RethrottleRequest, ListTasksResponse, RethrottleRequestBuilder> {
+    public static final RethrottleAction INSTANCE = new RethrottleAction();
+    public static final String NAME = "cluster:admin/reindex/rethrottle";
+
+    private RethrottleAction() {
+        super(NAME);
+    }
+
+    @Override
+    public RethrottleRequestBuilder newRequestBuilder(ElasticsearchClient client) {
+        return new RethrottleRequestBuilder(client, this);
+    }
+
+    @Override
+    public ListTasksResponse newResponse() {
+        return new ListTasksResponse();
+    }
+}

+ 68 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequest.java

@@ -0,0 +1,68 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.support.tasks.BaseTasksRequest;
+
+import static org.elasticsearch.action.ValidateActions.addValidationError;
+
+/**
+ * A request to change throttling on a task.
+ */
+public class RethrottleRequest extends BaseTasksRequest<RethrottleRequest> {
+    /**
+     * The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
+     * Throttling is done between batches, as we start the next scroll requests. That way we can increase the scroll's timeout to make sure
+     * that it contains any time that we might wait.
+     */
+    private float requestsPerSecond = 0;
+
+    /**
+     * The throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
+     */
+    public float getRequestsPerSecond() {
+        return requestsPerSecond;
+    }
+
+    /**
+     * Set the throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
+     */
+    public RethrottleRequest setRequestsPerSecond(float requestsPerSecond) {
+        this.requestsPerSecond = requestsPerSecond;
+        return this;
+    }
+
+    @Override
+    public ActionRequestValidationException validate() {
+        ActionRequestValidationException validationException = super.validate();
+        for (String action : getActions()) {
+            switch (action) {
+            case ReindexAction.NAME:
+            case UpdateByQueryAction.NAME:
+                continue;
+            default:
+                validationException = addValidationError(
+                        "Can only change the throttling on reindex or update-by-query. Not on [" + action + "]", validationException);
+            }
+        }
+        return validationException;
+    }
+}

+ 43 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/RethrottleRequestBuilder.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
+import org.elasticsearch.action.support.tasks.TasksRequestBuilder;
+import org.elasticsearch.client.ElasticsearchClient;
+
+/**
+ * Java API support for changing the throttle on reindex tasks while they are running.
+ */
+public class RethrottleRequestBuilder extends TasksRequestBuilder<RethrottleRequest, ListTasksResponse, RethrottleRequestBuilder> {
+    public RethrottleRequestBuilder(ElasticsearchClient client,
+            Action<RethrottleRequest, ListTasksResponse, RethrottleRequestBuilder> action) {
+        super(client, action, new RethrottleRequest());
+    }
+
+    /**
+     * Set the throttle to apply to all matching requests in sub-requests per second. 0 means set no throttle and that is the default.
+     */
+    public RethrottleRequestBuilder setRequestsPerSecond(float requestsPerSecond) {
+        request.setRequestsPerSecond(requestsPerSecond);
+        return this;
+    }
+}

+ 70 - 0
modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportRethrottleAction.java

@@ -0,0 +1,70 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.reindex;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
+import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.tasks.TransportTasksAction;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TransportRethrottleAction extends TransportTasksAction<BulkByScrollTask, RethrottleRequest, ListTasksResponse, TaskInfo> {
+    @Inject
+    public TransportRethrottleAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, ClusterService clusterService,
+            TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
+        super(settings, RethrottleAction.NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
+                indexNameExpressionResolver, RethrottleRequest::new, ListTasksResponse::new, ThreadPool.Names.MANAGEMENT);
+    }
+
+    @Override
+    protected TaskInfo taskOperation(RethrottleRequest request, BulkByScrollTask task) {
+        // Apply the new throttle and fetch status of the task. The user might not want that status but they likely do and it is cheap.
+        task.rethrottle(request.getRequestsPerSecond());
+        return task.taskInfo(clusterService.localNode(), true);
+    }
+
+    @Override
+    protected TaskInfo readTaskResponse(StreamInput in) throws IOException {
+        return new TaskInfo(in);
+    }
+
+    @Override
+    protected ListTasksResponse newResponse(RethrottleRequest request, List<TaskInfo> tasks,
+            List<TaskOperationFailure> taskOperationFailures, List<FailedNodeException> failedNodeExceptions) {
+        return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
+    }
+
+    @Override
+    protected boolean accumulateExceptions() {
+        return true;
+    }
+}

+ 1 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollActionTestCase.java

@@ -35,7 +35,7 @@ public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<
     @Before
     public void setupForTest() {
         threadPool = new ThreadPool(getTestName());
-        task = new BulkByScrollTask(1, "test", "test", "test");
+        task = new BulkByScrollTask(1, "test", "test", "test", 0);
     }
 
     @After

+ 26 - 10
modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java

@@ -88,6 +88,7 @@ import static org.apache.lucene.util.TestUtil.randomSimpleString;
 import static org.elasticsearch.action.bulk.BackoffPolicy.constantBackoff;
 import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
+import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
 import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.contains;
@@ -263,8 +264,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         }
         assertThat(client.scrollsCleared, contains(scrollId));
 
-        // While we're mocking the threadPool lets also check that we incremented the throttle counter
-        assertEquals(expectedDelay, task.getStatus().getThrottled());
+        // When the task is rejected we don't increment the throttled timer
+        assertEquals(timeValueMillis(0), task.getStatus().getThrottled());
     }
 
     /**
@@ -362,7 +363,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         assertThat((double) action.perfectlyThrottledBatchTime(randomInt()), closeTo(0f, 0f));
 
         int total = between(0, 1000000);
-        mainRequest.setRequestsPerSecond(1);
+        task.rethrottle(1);
         assertThat((double) action.perfectlyThrottledBatchTime(total),
                 closeTo(TimeUnit.SECONDS.toNanos(total), TimeUnit.SECONDS.toNanos(1)));
     }
@@ -373,11 +374,13 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
          * delay for throttling.
          */
         AtomicReference<TimeValue> capturedDelay = new AtomicReference<>();
+        AtomicReference<Runnable> capturedCommand = new AtomicReference<>();
         threadPool.shutdown();
         threadPool = new ThreadPool(getTestName()) {
             @Override
             public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
                 capturedDelay.set(delay);
+                capturedCommand.set(command);
                 return null;
             }
         };
@@ -386,7 +389,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         action.setScroll(scrollId());
 
         // We'd like to get about 1 request a second
-        mainRequest.setRequestsPerSecond(1f);
+        task.rethrottle(1f);
         // Make the last scroll look nearly instant
         action.setLastBatchStartTime(System.nanoTime());
         // The last batch had 100 documents
@@ -403,6 +406,10 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
 
         // The delay is still 100ish seconds because there hasn't been much time between when we requested the bulk and when we got it.
         assertThat(capturedDelay.get().seconds(), either(equalTo(100L)).or(equalTo(99L)));
+
+        // Running the command ought to increment the delay counter on the task.
+        capturedCommand.get().run();
+        assertEquals(capturedDelay.get(), task.getStatus().getThrottled());
     }
 
     private long retryTestCase(boolean failWithRejection) throws Exception {
@@ -539,9 +546,17 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         threadPool = new ThreadPool(getTestName()) {
             @Override
             public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
-                taskManager.cancel(task, reason, (Set<String> s) -> {});
-                command.run();
-                return null;
+                /*
+                 * This is called twice:
+                 * 1. To schedule the throttling. When that happens we immediately cancel the task.
+                 * 2. After the task is canceled.
+                 * Both times we use delegate to the standard behavior so the task is scheduled as expected so it can be cancelled and all
+                 * that good stuff.
+                 */
+                if (delay.nanos() > 0) {
+                    generic().execute(() -> taskManager.cancel(task, reason, (Set<String> s) -> {}));
+                }
+                return super.schedule(delay, name, command);
             }
         };
 
@@ -554,10 +569,11 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
         long total = randomIntBetween(0, Integer.MAX_VALUE);
         InternalSearchHits hits = new InternalSearchHits(null, total, 0);
         InternalSearchResponse searchResponse = new InternalSearchResponse(hits, null, null, null, false, false);
-        action.onScrollResponse(timeValueSeconds(0), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
+        // Use a long delay here so the test will time out if the cancellation doesn't reschedule the throttled task
+        action.onScrollResponse(timeValueMinutes(10), new SearchResponse(searchResponse, scrollId(), 5, 4, randomLong(), null));
 
-        // Now that we've got our cancel we'll just verify that it all came through allright
-        assertEquals(reason, listener.get().getReasonCancelled());
+        // Now that we've got our cancel we'll just verify that it all came through all right
+        assertEquals(reason, listener.get(10, TimeUnit.SECONDS).getReasonCancelled());
         if (previousScrollSet) {
             // Canceled tasks always start to clear the scroll before they die.
             assertThat(client.scrollsCleared, contains(scrollId));

+ 98 - 9
modules/reindex/src/test/java/org/elasticsearch/index/reindex/BulkByScrollTaskTests.java

@@ -20,17 +20,31 @@
 package org.elasticsearch.index.reindex;
 
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
 import org.junit.Before;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
+import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 public class BulkByScrollTaskTests extends ESTestCase {
     private BulkByScrollTask task;
 
     @Before
     public void createTask() {
-        task = new BulkByScrollTask(1, "test_type", "test_action", "test");
+        task = new BulkByScrollTask(1, "test_type", "test_action", "test", 0);
     }
 
     public void testBasicData() {
@@ -104,14 +118,89 @@ public class BulkByScrollTaskTests extends ESTestCase {
     }
 
     public void testStatusHatesNegatives() {
+        expectThrows(IllegalArgumentException.class, status(-1, 0, 0, 0, 0, 0, 0, 0));
+        expectThrows(IllegalArgumentException.class, status(0, -1, 0, 0, 0, 0, 0, 0));
+        expectThrows(IllegalArgumentException.class, status(0, 0, -1, 0, 0, 0, 0, 0));
+        expectThrows(IllegalArgumentException.class, status(0, 0, 0, -1, 0, 0, 0, 0));
+        expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, -1, 0, 0, 0));
+        expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, 0, -1, 0, 0));
+        expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, 0, 0, -1, 0));
+        expectThrows(IllegalArgumentException.class, status(0, 0, 0, 0, 0, 0, 0, -1));
+    }
+
+    /**
+     * Build a task status with only some values. Used for testing negative values.
+     */
+    private ThrowingRunnable status(long total, long updated, long created, long deleted, int batches, long versionConflicts,
+            long noops, long retries) {
         TimeValue throttle = parseTimeValue(randomPositiveTimeValue(), "test");
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0, throttle, null));
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, -1, 0, 0, 0, 0, 0, 0, throttle, null));
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, -1, 0, 0, 0, 0, 0, throttle, null));
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, -1, 0, 0, 0, 0, throttle, null));
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, -1, 0, 0, 0, throttle, null));
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, -1, 0, 0, throttle, null));
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, -1, 0, throttle, null));
-        expectThrows(IllegalArgumentException.class, () -> new BulkByScrollTask.Status(0, 0, 0, 0, 0, 0, 0, -1, throttle, null));
+        TimeValue throttledUntil = parseTimeValue(randomPositiveTimeValue(), "test");
+
+        return () -> new BulkByScrollTask.Status(-1, 0, 0, 0, 0, 0, 0, 0, throttle, 0f, null, throttledUntil);
+    }
+
+    /**
+     * Furiously rethrottles a delayed request to make sure that we never run it twice.
+     */
+    public void testDelayAndRethrottle() throws IOException, InterruptedException {
+        List<Throwable> errors = new CopyOnWriteArrayList<>();
+        AtomicBoolean done = new AtomicBoolean();
+        int threads = between(1, 10);
+
+        /*
+         * We never end up waiting this long because the test rethrottles over and over again, ratcheting down the delay a random amount
+         * each time.
+         */
+        float originalRequestsPerSecond = (float) randomDoubleBetween(0, 10000, true);
+        task.rethrottle(originalRequestsPerSecond);
+        TimeValue maxDelay = timeValueSeconds(between(1, 5));
+        assertThat(maxDelay.nanos(), greaterThanOrEqualTo(0L));
+        ThreadPool threadPool = new ThreadPool(getTestName()) {
+            @Override
+            public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
+                assertThat(delay.nanos(), both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(maxDelay.nanos())));
+                return super.schedule(delay, name, command);
+            }
+        };
+        try {
+            task.delayPrepareBulkRequest(threadPool, maxDelay, new AbstractRunnable() {
+                @Override
+                protected void doRun() throws Exception {
+                    boolean oldValue = done.getAndSet(true);
+                    if (oldValue) {
+                        throw new RuntimeException("Ran twice oh no!");
+                    }
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    errors.add(t);
+                }
+            });
+
+            // Rethrottle on a random number of threads, on of which is this thread.
+            Runnable test = () -> {
+                try {
+                    int rethrottles = 0;
+                    while (false == done.get()) {
+                        float requestsPerSecond = (float) randomDoubleBetween(0, originalRequestsPerSecond * 2, true);
+                        task.rethrottle(requestsPerSecond);
+                        rethrottles += 1;
+                    }
+                    logger.info("Rethrottled [{}] times", rethrottles);
+                } catch (Exception e) {
+                    errors.add(e);
+                }
+            };
+            for (int i = 1; i < threads; i++) {
+                threadPool.generic().execute(test);
+            }
+            test.run();
+        } finally {
+            // Other threads should finish up quickly as they are checking the same AtomicBoolean.
+            threadPool.shutdown();
+            threadPool.awaitTermination(10, TimeUnit.SECONDS);
+        }
+        assertThat(errors, empty());
     }
 }

+ 1 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTestUtils.java

@@ -51,7 +51,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 /**
- * Utilities for testing reindex and update-by-query cancelation. This whole class isn't thread safe. Luckily we run out tests in separate
+ * Utilities for testing reindex and update-by-query cancellation. This whole class isn't thread safe. Luckily we run out tests in separate
  * jvms.
  */
 public class CancelTestUtils {

+ 5 - 1
modules/reindex/src/test/java/org/elasticsearch/index/reindex/RoundTripTests.java

@@ -38,6 +38,7 @@ import org.elasticsearch.test.ESTestCase;
 import java.io.IOException;
 import java.util.List;
 
+import static java.lang.Math.abs;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
@@ -122,7 +123,8 @@ public class RoundTripTests extends ESTestCase {
     private BulkByScrollTask.Status randomStatus() {
         return new BulkByScrollTask.Status(randomPositiveLong(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
                 randomPositiveInt(), randomPositiveLong(), randomPositiveLong(), randomPositiveLong(),
-                parseTimeValue(randomPositiveTimeValue(), "test"), random().nextBoolean() ? null : randomSimpleString(random()));
+                parseTimeValue(randomPositiveTimeValue(), "test"), abs(random().nextFloat()),
+                random().nextBoolean() ? null : randomSimpleString(random()), parseTimeValue(randomPositiveTimeValue(), "test"));
     }
 
     private List<Failure> randomIndexingFailures() {
@@ -198,6 +200,8 @@ public class RoundTripTests extends ESTestCase {
         assertEquals(expected.getNoops(), actual.getNoops());
         assertEquals(expected.getRetries(), actual.getRetries());
         assertEquals(expected.getThrottled(), actual.getThrottled());
+        assertEquals(expected.getRequestsPerSecond(), actual.getRequestsPerSecond(), 0f);
         assertEquals(expected.getReasonCancelled(), actual.getReasonCancelled());
+        assertEquals(expected.getThrottledUntil(), actual.getThrottledUntil());
     }
 }

+ 1 - 0
modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml

@@ -93,6 +93,7 @@
       tasks.list:
         wait_for_completion: true
         task_id: $task
+  - is_false: node_failures
 
 ---
 "Response format for version conflict":

+ 12 - 0
modules/reindex/src/test/resources/rest-api-spec/test/reindex/20_validation.yaml

@@ -148,3 +148,15 @@
           dest:
             index: dest
             timestamp: "123"
+
+---
+"requests_per_second cannot be an empty string":
+  - do:
+      catch: /requests_per_second cannot be an empty string/
+      reindex:
+        requests_per_second: ""
+        body:
+          source:
+            from: 1
+          dest:
+            index: dest

+ 174 - 0
modules/reindex/src/test/resources/rest-api-spec/test/reindex/80_throttle.yaml

@@ -51,3 +51,177 @@
   - lt: {throttled_millis: 4000}
   - gte: { took: 1000 }
   - is_false: task
+
+---
+"Rethrottle":
+  # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
+  # and a small batch size on the request
+  - do:
+      indices.create:
+        index: source
+        body:
+          settings:
+            number_of_shards: "1"
+            number_of_replicas: "0"
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      1
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      2
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      3
+        body:    { "text": "test" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      reindex:
+        requests_per_second: .00000001 # About 9.5 years to complete the request
+        wait_for_completion: false
+        body:
+          source:
+            index: source
+            size: 1
+          dest:
+            index: dest
+  - match: {task: '/.+:\d+/'}
+  - set: {task: task}
+
+  - do:
+      reindex.rethrottle:
+        requests_per_second: unlimited
+        task_id: $task
+
+  - do:
+      tasks.list:
+        wait_for_completion: true
+        task_id: $task
+
+---
+"Rethrottle to -1 which also means unlimited":
+  # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
+  # and a small batch size on the request
+  - do:
+      indices.create:
+        index: source
+        body:
+          settings:
+            number_of_shards: "1"
+            number_of_replicas: "0"
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      1
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      2
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      3
+        body:    { "text": "test" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      reindex:
+        requests_per_second: .00000001 # About 9.5 years to complete the request
+        wait_for_completion: false
+        body:
+          source:
+            index: source
+            size: 1
+          dest:
+            index: dest
+  - match: {task: '/.+:\d+/'}
+  - set: {task: task}
+
+  - do:
+      reindex.rethrottle:
+        requests_per_second: -1
+        task_id: $task
+
+  - do:
+      tasks.list:
+        wait_for_completion: true
+        task_id: $task
+
+---
+"Rethrottle but not unlimited":
+  # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
+  # and a small batch size on the request
+  - do:
+      indices.create:
+        index: source
+        body:
+          settings:
+            number_of_shards: "1"
+            number_of_replicas: "0"
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      1
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      2
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   source
+        type:    foo
+        id:      3
+        body:    { "text": "test" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      reindex:
+        requests_per_second: .00000001 # About 9.5 years to complete the request
+        wait_for_completion: false
+        body:
+          source:
+            index: source
+            size: 1
+          dest:
+            index: dest
+  - match: {task: '/.+:\d+/'}
+  - set: {task: task}
+
+  - do:
+      reindex.rethrottle:
+        requests_per_second: 1
+        task_id: $task
+
+  - do:
+      tasks.list:
+        wait_for_completion: true
+        task_id: $task

+ 1 - 0
modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/10_basic.yaml

@@ -54,6 +54,7 @@
       tasks.list:
         wait_for_completion: true
         task_id: $task
+  - is_false: node_failures
 
 ---
 "Response for version conflict":

+ 14 - 0
modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/20_validation.yaml

@@ -25,3 +25,17 @@
       update_by_query:
         index: test
         size: -4
+
+---
+"requests_per_second cannot be an empty string":
+  - do:
+      index:
+        index:   test
+        type:    test
+        id:      1
+        body:    { "text": "test" }
+  - do:
+      catch: /requests_per_second cannot be an empty string/
+      update_by_query:
+        index: test
+        requests_per_second: ''

+ 150 - 0
modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/70_throttle.yaml

@@ -37,3 +37,153 @@
   - match: {updated: 3}
   - gt: {throttled_millis: 1000}
   - lt: {throttled_millis: 4000}
+
+---
+"Rethrottle":
+  # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
+  # and a small batch size on the request
+  - do:
+      indices.create:
+        index: test
+        body:
+          settings:
+            number_of_shards: 1
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      update_by_query:
+        requests_per_second: .00000001 # About 9.5 years to complete the request
+        wait_for_completion: false
+        index: test
+        scroll_size: 1
+  - match: {task: '/.+:\d+/'}
+  - set: {task: task}
+
+  - do:
+      reindex.rethrottle:
+        requests_per_second: unlimited
+        task_id: $task
+
+  - do:
+      tasks.list:
+        wait_for_completion: true
+        task_id: $task
+
+---
+"Rethrottle to -1 which also means unlimited":
+  # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
+  # and a small batch size on the request
+  - do:
+      indices.create:
+        index: test
+        body:
+          settings:
+            number_of_shards: 1
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      update_by_query:
+        requests_per_second: .00000001 # About 9.5 years to complete the request
+        wait_for_completion: false
+        index: test
+        scroll_size: 1
+  - match: {task: '/.+:\d+/'}
+  - set: {task: task}
+
+  - do:
+      reindex.rethrottle:
+        requests_per_second: -1
+        task_id: $task
+
+  - do:
+      tasks.list:
+        wait_for_completion: true
+        task_id: $task
+
+---
+"Rethrottle but not unlimited":
+  # Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
+  # and a small batch size on the request
+  - do:
+      indices.create:
+        index: test
+        body:
+          settings:
+            number_of_shards: 1
+  - do:
+      cluster.health:
+          wait_for_status: yellow
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      index:
+        index:   test
+        type:    foo
+        body:    { "text": "test" }
+  - do:
+      indices.refresh: {}
+
+  - do:
+      update_by_query:
+        requests_per_second: .00000001 # About 9.5 years to complete the request
+        wait_for_completion: false
+        index: test
+        scroll_size: 1
+  - match: {task: '/.+:\d+/'}
+  - set: {task: task}
+
+  - do:
+      reindex.rethrottle:
+        requests_per_second: 1
+        task_id: $task
+
+  - do:
+      tasks.list:
+        wait_for_completion: true
+        task_id: $task

+ 24 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/reindex.rethrottle.json

@@ -0,0 +1,24 @@
+{
+  "reindex.rethrottle": {
+    "documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-reindex.html",
+    "methods": ["POST"],
+    "url": {
+      "path": "/_reindex/{task_id}/_rethrottle",
+      "paths": ["/_reindex/{task_id}/_rethrottle", "/_update_by_query/{task_id}/_rethrottle"],
+      "parts": {
+        "task_id": {
+          "type": "string",
+          "description": "The task id to rethrottle"
+        }
+      },
+      "params": {
+        "requests_per_second": {
+          "type": "float",
+          "default": 0,
+          "description": "The throttle to set on this request in sub-requests per second. 0 means set no throttle. As does \"unlimited\". Otherwise it must be a float."
+        }
+      }
+    },
+    "body": null
+  }
+}

+ 1 - 1
rest-api-spec/src/main/resources/rest-api-spec/api/tasks.cancel.json

@@ -7,7 +7,7 @@
       "paths": ["/_tasks/_cancel", "/_tasks/{task_id}/_cancel"],
       "parts": {
         "task_id": {
-          "type": "number",
+          "type": "string",
           "description": "Cancel the task with specified id"
         }
       },