|
@@ -7,9 +7,12 @@
|
|
|
|
|
|
package org.elasticsearch.xpack.migrate.task;
|
|
|
|
|
|
+import org.elasticsearch.common.util.concurrent.RunOnce;
|
|
|
+import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.core.Tuple;
|
|
|
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
|
|
import org.elasticsearch.tasks.TaskId;
|
|
|
+import org.elasticsearch.threadpool.ThreadPool;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
@@ -21,12 +24,14 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
|
|
|
private final long persistentTaskStartTime;
|
|
|
private final int totalIndices;
|
|
|
private final int totalIndicesToBeUpgraded;
|
|
|
- private boolean complete = false;
|
|
|
- private Exception exception;
|
|
|
- private AtomicInteger inProgress = new AtomicInteger(0);
|
|
|
- private AtomicInteger pending = new AtomicInteger();
|
|
|
- private List<Tuple<String, Exception>> errors = new ArrayList<>();
|
|
|
+ private volatile boolean complete = false;
|
|
|
+ private volatile Exception exception;
|
|
|
+ private final AtomicInteger inProgress = new AtomicInteger(0);
|
|
|
+ private final AtomicInteger pending = new AtomicInteger();
|
|
|
+ private final List<Tuple<String, Exception>> errors = new ArrayList<>();
|
|
|
+ private final RunOnce completeTask;
|
|
|
|
|
|
+ @SuppressWarnings("this-escape")
|
|
|
public ReindexDataStreamTask(
|
|
|
long persistentTaskStartTime,
|
|
|
int totalIndices,
|
|
@@ -42,6 +47,13 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
|
|
|
this.persistentTaskStartTime = persistentTaskStartTime;
|
|
|
this.totalIndices = totalIndices;
|
|
|
this.totalIndicesToBeUpgraded = totalIndicesToBeUpgraded;
|
|
|
+ this.completeTask = new RunOnce(() -> {
|
|
|
+ if (exception == null) {
|
|
|
+ markAsCompleted();
|
|
|
+ } else {
|
|
|
+ markAsFailed(exception);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -58,13 +70,18 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public void allReindexesCompleted() {
|
|
|
+ public void allReindexesCompleted(ThreadPool threadPool, TimeValue timeToLive) {
|
|
|
this.complete = true;
|
|
|
+ if (isCancelled()) {
|
|
|
+ completeTask.run();
|
|
|
+ } else {
|
|
|
+ threadPool.schedule(completeTask, timeToLive, threadPool.generic());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public void taskFailed(Exception e) {
|
|
|
- this.complete = true;
|
|
|
+ public void taskFailed(ThreadPool threadPool, TimeValue timeToLive, Exception e) {
|
|
|
this.exception = e;
|
|
|
+ allReindexesCompleted(threadPool, timeToLive);
|
|
|
}
|
|
|
|
|
|
public void reindexSucceeded() {
|
|
@@ -84,4 +101,16 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
|
|
|
public void setPendingIndicesCount(int size) {
|
|
|
pending.set(size);
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onCancelled() {
|
|
|
+ /*
|
|
|
+ * If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed
|
|
|
+ * immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
|
|
|
+ * allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
|
|
|
+ */
|
|
|
+ if (complete) {
|
|
|
+ completeTask.run();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|