|
@@ -20,15 +20,12 @@
|
|
package org.elasticsearch.index.reindex;
|
|
package org.elasticsearch.index.reindex;
|
|
|
|
|
|
import org.apache.logging.log4j.Logger;
|
|
import org.apache.logging.log4j.Logger;
|
|
-import org.elasticsearch.common.logging.ESLoggerFactory;
|
|
|
|
|
|
+import org.elasticsearch.common.logging.Loggers;
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
|
-import org.elasticsearch.tasks.TaskId;
|
|
|
|
-import org.elasticsearch.tasks.TaskInfo;
|
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
-import java.util.List;
|
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -41,15 +38,19 @@ import static java.lang.Math.round;
|
|
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
|
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * {@link BulkByScrollTask} subclass for tasks that actually perform the work. Compare to {@link ParentBulkByScrollTask}.
|
|
|
|
|
|
+ * Task behavior for {@link BulkByScrollTask} that does the actual work of querying and indexing
|
|
*/
|
|
*/
|
|
-public class WorkingBulkByScrollTask extends BulkByScrollTask implements SuccessfullyProcessed {
|
|
|
|
- private static final Logger logger = ESLoggerFactory.getLogger(BulkByScrollTask.class.getPackage().getName());
|
|
|
|
|
|
+public class WorkerBulkByScrollTaskState implements SuccessfullyProcessed {
|
|
|
|
+
|
|
|
|
+ private static final Logger logger = Loggers.getLogger(WorkerBulkByScrollTaskState.class);
|
|
|
|
+
|
|
|
|
+ private final BulkByScrollTask task;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * The id of the slice that this task is processing or {@code null} if this task isn't for a sliced request.
|
|
|
|
|
|
+ * The id of the slice that this worker is processing or {@code null} if this task isn't for a sliced request.
|
|
*/
|
|
*/
|
|
private final Integer sliceId;
|
|
private final Integer sliceId;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
|
|
* The total number of documents this request will process. 0 means we don't yet know or, possibly, there are actually 0 documents
|
|
* to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived.
|
|
* to process. Its ok that these have the same meaning because any request with 0 actual documents should be quite short lived.
|
|
@@ -64,58 +65,47 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
|
private final AtomicLong bulkRetries = new AtomicLong(0);
|
|
private final AtomicLong bulkRetries = new AtomicLong(0);
|
|
private final AtomicLong searchRetries = new AtomicLong(0);
|
|
private final AtomicLong searchRetries = new AtomicLong(0);
|
|
private final AtomicLong throttledNanos = new AtomicLong();
|
|
private final AtomicLong throttledNanos = new AtomicLong();
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* The number of requests per second to which to throttle the request that this task represents. The other variables are all AtomicXXX
|
|
* The number of requests per second to which to throttle the request that this task represents. The other variables are all AtomicXXX
|
|
* style variables but there isn't an AtomicFloat so we just use a volatile.
|
|
* style variables but there isn't an AtomicFloat so we just use a volatile.
|
|
*/
|
|
*/
|
|
private volatile float requestsPerSecond;
|
|
private volatile float requestsPerSecond;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Reference to any the last delayed prepareBulkRequest call. Used during rethrottling and canceling to reschedule the request.
|
|
* Reference to any the last delayed prepareBulkRequest call. Used during rethrottling and canceling to reschedule the request.
|
|
*/
|
|
*/
|
|
private final AtomicReference<DelayedPrepareBulkRequest> delayedPrepareBulkRequestReference = new AtomicReference<>();
|
|
private final AtomicReference<DelayedPrepareBulkRequest> delayedPrepareBulkRequestReference = new AtomicReference<>();
|
|
|
|
|
|
- public WorkingBulkByScrollTask(long id, String type, String action, String description, TaskId parentTask, Integer sliceId,
|
|
|
|
- float requestsPerSecond) {
|
|
|
|
- super(id, type, action, description, parentTask);
|
|
|
|
|
|
+ public WorkerBulkByScrollTaskState(BulkByScrollTask task, Integer sliceId, float requestsPerSecond) {
|
|
|
|
+ this.task = task;
|
|
this.sliceId = sliceId;
|
|
this.sliceId = sliceId;
|
|
setRequestsPerSecond(requestsPerSecond);
|
|
setRequestsPerSecond(requestsPerSecond);
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public Status getStatus() {
|
|
|
|
- return new Status(sliceId, total.get(), updated.get(), created.get(), deleted.get(), batch.get(), versionConflicts.get(),
|
|
|
|
- noops.get(), bulkRetries.get(), searchRetries.get(), timeValueNanos(throttledNanos.get()), getRequestsPerSecond(),
|
|
|
|
- getReasonCancelled(), throttledUntil());
|
|
|
|
|
|
+ public BulkByScrollTask.Status getStatus() {
|
|
|
|
+ return new BulkByScrollTask.Status(
|
|
|
|
+ sliceId,
|
|
|
|
+ total.get(),
|
|
|
|
+ updated.get(),
|
|
|
|
+ created.get(),
|
|
|
|
+ deleted.get(),
|
|
|
|
+ batch.get(),
|
|
|
|
+ versionConflicts.get(),
|
|
|
|
+ noops.get(),
|
|
|
|
+ bulkRetries.get(),
|
|
|
|
+ searchRetries.get(),
|
|
|
|
+ timeValueNanos(throttledNanos.get()),
|
|
|
|
+ getRequestsPerSecond(),
|
|
|
|
+ task.getReasonCancelled(),
|
|
|
|
+ throttledUntil());
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- protected void onCancelled() {
|
|
|
|
- /* Drop the throttle to 0, immediately rescheduling any throttled
|
|
|
|
- * operation so it will wake up and cancel itself. */
|
|
|
|
|
|
+ public void handleCancel() {
|
|
|
|
+ // Drop the throttle to 0, immediately rescheduling any throttle operation so it will wake up and cancel itself.
|
|
rethrottle(Float.POSITIVE_INFINITY);
|
|
rethrottle(Float.POSITIVE_INFINITY);
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public int runningSliceSubTasks() {
|
|
|
|
- return 0;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public TaskInfo getInfoGivenSliceInfo(String localNodeId, List<TaskInfo> sliceInfo) {
|
|
|
|
- throw new UnsupportedOperationException("This is only supported by " + ParentBulkByScrollTask.class.getName() + ".");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- TimeValue throttledUntil() {
|
|
|
|
- DelayedPrepareBulkRequest delayed = delayedPrepareBulkRequestReference.get();
|
|
|
|
- if (delayed == null) {
|
|
|
|
- return timeValueNanos(0);
|
|
|
|
- }
|
|
|
|
- if (delayed.future == null) {
|
|
|
|
- return timeValueNanos(0);
|
|
|
|
- }
|
|
|
|
- return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS)));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
public void setTotal(long totalHits) {
|
|
public void setTotal(long totalHits) {
|
|
total.set(totalHits);
|
|
total.set(totalHits);
|
|
}
|
|
}
|
|
@@ -171,6 +161,17 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
|
return requestsPerSecond;
|
|
return requestsPerSecond;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ TimeValue throttledUntil() {
|
|
|
|
+ DelayedPrepareBulkRequest delayed = delayedPrepareBulkRequestReference.get();
|
|
|
|
+ if (delayed == null) {
|
|
|
|
+ return timeValueNanos(0);
|
|
|
|
+ }
|
|
|
|
+ if (delayed.future == null) {
|
|
|
|
+ return timeValueNanos(0);
|
|
|
|
+ }
|
|
|
|
+ return timeValueNanos(max(0, delayed.future.getDelay(TimeUnit.NANOSECONDS)));
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
|
|
* Schedule prepareBulkRequestRunnable to run after some delay. This is where throttling plugs into reindexing so the request can be
|
|
* rescheduled over and over again.
|
|
* rescheduled over and over again.
|
|
@@ -180,9 +181,9 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
|
// Synchronize so we are less likely to schedule the same request twice.
|
|
// Synchronize so we are less likely to schedule the same request twice.
|
|
synchronized (delayedPrepareBulkRequestReference) {
|
|
synchronized (delayedPrepareBulkRequestReference) {
|
|
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
|
|
TimeValue delay = throttleWaitTime(lastBatchStartTime, timeValueNanos(System.nanoTime()), lastBatchSize);
|
|
- logger.debug("[{}]: preparing bulk request for [{}]", getId(), delay);
|
|
|
|
|
|
+ logger.debug("[{}]: preparing bulk request for [{}]", task.getId(), delay);
|
|
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
|
|
delayedPrepareBulkRequestReference.set(new DelayedPrepareBulkRequest(threadPool, getRequestsPerSecond(),
|
|
- delay, new RunOnce(prepareBulkRequestRunnable)));
|
|
|
|
|
|
+ delay, new RunOnce(prepareBulkRequestRunnable)));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -213,16 +214,18 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
|
this.requestsPerSecond = requestsPerSecond;
|
|
this.requestsPerSecond = requestsPerSecond;
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Apply {@code newRequestsPerSecond} as the new rate limit for this task's search requests
|
|
|
|
+ */
|
|
public void rethrottle(float newRequestsPerSecond) {
|
|
public void rethrottle(float newRequestsPerSecond) {
|
|
synchronized (delayedPrepareBulkRequestReference) {
|
|
synchronized (delayedPrepareBulkRequestReference) {
|
|
- logger.debug("[{}]: rethrottling to [{}] requests per second", getId(), newRequestsPerSecond);
|
|
|
|
|
|
+ logger.debug("[{}]: rethrottling to [{}] requests per second", task.getId(), newRequestsPerSecond);
|
|
setRequestsPerSecond(newRequestsPerSecond);
|
|
setRequestsPerSecond(newRequestsPerSecond);
|
|
|
|
|
|
DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get();
|
|
DelayedPrepareBulkRequest delayedPrepareBulkRequest = this.delayedPrepareBulkRequestReference.get();
|
|
if (delayedPrepareBulkRequest == null) {
|
|
if (delayedPrepareBulkRequest == null) {
|
|
// No request has been queued so nothing to reschedule.
|
|
// No request has been queued so nothing to reschedule.
|
|
- logger.debug("[{}]: skipping rescheduling because there is no scheduled task", getId());
|
|
|
|
|
|
+ logger.debug("[{}]: skipping rescheduling because there is no scheduled task", task.getId());
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -260,8 +263,8 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
|
* change in throttle take effect the next time we delay
|
|
* change in throttle take effect the next time we delay
|
|
* prepareBulkRequest. We can't just reschedule the request further
|
|
* prepareBulkRequest. We can't just reschedule the request further
|
|
* out in the future because the bulk context might time out. */
|
|
* out in the future because the bulk context might time out. */
|
|
- logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", getId(),
|
|
|
|
- newRequestsPerSecond, requestsPerSecond);
|
|
|
|
|
|
+ logger.debug("[{}]: skipping rescheduling because the new throttle [{}] is slower than the old one [{}]", task.getId(),
|
|
|
|
+ newRequestsPerSecond, requestsPerSecond);
|
|
return this;
|
|
return this;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -269,7 +272,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
|
// Actually reschedule the task
|
|
// Actually reschedule the task
|
|
if (false == FutureUtils.cancel(future)) {
|
|
if (false == FutureUtils.cancel(future)) {
|
|
// Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here.
|
|
// Couldn't cancel, probably because the task has finished or been scheduled. Either way we have nothing to do here.
|
|
- logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", getId());
|
|
|
|
|
|
+ logger.debug("[{}]: skipping rescheduling because we couldn't cancel the task", task.getId());
|
|
return this;
|
|
return this;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -278,7 +281,7 @@ public class WorkingBulkByScrollTask extends BulkByScrollTask implements Success
|
|
* test it you'll find that requests sneak through. So each request
|
|
* test it you'll find that requests sneak through. So each request
|
|
* is given a runOnce boolean to prevent that. */
|
|
* is given a runOnce boolean to prevent that. */
|
|
TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond);
|
|
TimeValue newDelay = newDelay(remainingDelay, newRequestsPerSecond);
|
|
- logger.debug("[{}]: rescheduling for [{}] in the future", getId(), newDelay);
|
|
|
|
|
|
+ logger.debug("[{}]: rescheduling for [{}] in the future", task.getId(), newDelay);
|
|
return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command);
|
|
return new DelayedPrepareBulkRequest(threadPool, requestsPerSecond, newDelay, command);
|
|
}
|
|
}
|
|
|
|
|