|
@@ -13,11 +13,13 @@ import org.elasticsearch.action.ActionListener;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
import org.elasticsearch.action.support.HandledTransportAction;
|
|
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
+import org.elasticsearch.client.internal.Client;
|
|
|
import org.elasticsearch.cluster.metadata.DataStream;
|
|
|
import org.elasticsearch.cluster.metadata.Metadata;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.core.TimeValue;
|
|
|
import org.elasticsearch.injection.guice.Inject;
|
|
|
+import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
|
|
import org.elasticsearch.persistent.PersistentTasksService;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
@@ -38,13 +40,15 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
|
|
|
private final PersistentTasksService persistentTasksService;
|
|
|
private final TransportService transportService;
|
|
|
private final ClusterService clusterService;
|
|
|
+ private final Client client;
|
|
|
|
|
|
@Inject
|
|
|
public ReindexDataStreamTransportAction(
|
|
|
TransportService transportService,
|
|
|
ActionFilters actionFilters,
|
|
|
PersistentTasksService persistentTasksService,
|
|
|
- ClusterService clusterService
|
|
|
+ ClusterService clusterService,
|
|
|
+ Client client
|
|
|
) {
|
|
|
super(
|
|
|
ReindexDataStreamAction.NAME,
|
|
@@ -57,6 +61,7 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
|
|
|
this.transportService = transportService;
|
|
|
this.persistentTasksService = persistentTasksService;
|
|
|
this.clusterService = clusterService;
|
|
|
+ this.client = client;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -78,6 +83,40 @@ public class ReindexDataStreamTransportAction extends HandledTransportAction<Rei
|
|
|
ClientHelper.getPersistableSafeSecurityHeaders(transportService.getThreadPool().getThreadContext(), clusterService.state())
|
|
|
);
|
|
|
String persistentTaskId = getPersistentTaskId(sourceDataStreamName);
|
|
|
+
|
|
|
+ PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state()
|
|
|
+ .getMetadata()
|
|
|
+ .custom(PersistentTasksCustomMetadata.TYPE);
|
|
|
+ PersistentTasksCustomMetadata.PersistentTask<?> persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId);
|
|
|
+
|
|
|
+ if (persistentTask == null) {
|
|
|
+ startTask(listener, persistentTaskId, params);
|
|
|
+ } else {
|
|
|
+ GetMigrationReindexStatusAction.Request statusRequest = new GetMigrationReindexStatusAction.Request(sourceDataStreamName);
|
|
|
+ statusRequest.setParentTask(task.getParentTaskId());
|
|
|
+ client.execute(
|
|
|
+ GetMigrationReindexStatusAction.INSTANCE,
|
|
|
+ statusRequest,
|
|
|
+ listener.delegateFailureAndWrap((getListener, getResponse) -> {
|
|
|
+ if (getResponse.getEnrichedStatus().complete() == false) {
|
|
|
+ throw new ResourceAlreadyExistsException("Reindex task for data stream [{}] already exists", sourceDataStreamName);
|
|
|
+ }
|
|
|
+ CancelReindexDataStreamAction.Request cancelRequest = new CancelReindexDataStreamAction.Request(sourceDataStreamName);
|
|
|
+ cancelRequest.setParentTask(task.getParentTaskId());
|
|
|
+ client.execute(
|
|
|
+ CancelReindexDataStreamAction.INSTANCE,
|
|
|
+ cancelRequest,
|
|
|
+ getListener.delegateFailureAndWrap(
|
|
|
+ (cancelListener, cancelResponse) -> startTask(cancelListener, persistentTaskId, params)
|
|
|
+ )
|
|
|
+ );
|
|
|
+ })
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void startTask(ActionListener<AcknowledgedResponse> listener, String persistentTaskId, ReindexDataStreamTaskParams params) {
|
|
|
persistentTasksService.sendStartRequest(
|
|
|
persistentTaskId,
|
|
|
ReindexDataStreamTask.TASK_NAME,
|