Ver Fonte

Making reindex data streams actions cancellable (#122438) (#122445)

Keith Massey há 8 meses atrás
pai
commit
a9e99aa977

+ 13 - 0
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/CreateIndexFromSourceAction.java

@@ -15,6 +15,9 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
@@ -191,5 +194,15 @@ public class CreateIndexFromSourceAction extends ActionType<AcknowledgedResponse
         public IndicesOptions indicesOptions() {
             return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
+        }
+
+        @Override
+        public String getDescription() {
+            return "creating index " + destIndex + " from " + sourceIndex;
+        }
     }
 }

+ 14 - 0
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java

@@ -16,6 +16,9 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.features.NodeFeature;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.xcontent.ConstructingObjectParser;
 import org.elasticsearch.xcontent.ParseField;
 import org.elasticsearch.xcontent.ToXContent;
@@ -24,6 +27,7 @@ import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.function.Predicate;
 
@@ -144,5 +148,15 @@ public class ReindexDataStreamAction extends ActionType<AcknowledgedResponse> {
             builder.endObject();
             return builder;
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
+        }
+
+        @Override
+        public String getDescription() {
+            return "reindexing data stream " + sourceDataStream;
+        }
     }
 }

+ 14 - 0
x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexAction.java

@@ -14,8 +14,12 @@ import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.Objects;
 
 public class ReindexDataStreamIndexAction extends ActionType<ReindexDataStreamIndexAction.Response> {
@@ -78,6 +82,16 @@ public class ReindexDataStreamIndexAction extends ActionType<ReindexDataStreamIn
         public IndicesOptions indicesOptions() {
             return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
         }
+
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, getDescription(), parentTaskId, headers);
+        }
+
+        @Override
+        public String getDescription() {
+            return "reindexing data stream index " + sourceIndex;
+        }
     }
 
     public static class Response extends ActionResponse {