|
@@ -13,29 +13,24 @@ import org.elasticsearch.action.FailedNodeException;
|
|
|
import org.elasticsearch.action.TaskOperationFailure;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
|
|
-import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
|
|
-import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|
|
import org.elasticsearch.rest.RestStatus;
|
|
|
import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
import org.elasticsearch.xpack.core.action.util.PageParams;
|
|
|
-import org.elasticsearch.xpack.core.dataframe.DataFrameField;
|
|
|
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
|
|
|
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
|
|
|
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
|
|
|
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
|
|
|
import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
|
|
|
|
|
|
-import java.util.Collection;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
|
|
|
-import static org.elasticsearch.ExceptionsHelper.convertToElastic;
|
|
|
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
|
|
|
|
|
public class TransportStopDataFrameTransformAction extends
|
|
@@ -63,7 +58,7 @@ public class TransportStopDataFrameTransformAction extends
|
|
|
dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
|
|
|
expandedIds -> {
|
|
|
request.setExpandedIds(new HashSet<>(expandedIds));
|
|
|
- request.setNodes(dataframeNodes(expandedIds, clusterService.state()));
|
|
|
+ request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state()));
|
|
|
super.doExecute(task, request, listener);
|
|
|
},
|
|
|
listener::onFailure
|
|
@@ -136,48 +131,12 @@ public class TransportStopDataFrameTransformAction extends
|
|
|
List<StopDataFrameTransformAction.Response> tasks, List<TaskOperationFailure> taskOperationFailures,
|
|
|
List<FailedNodeException> failedNodeExceptions) {
|
|
|
|
|
|
- if (taskOperationFailures.isEmpty() == false) {
|
|
|
- throw convertToElastic(taskOperationFailures.get(0).getCause());
|
|
|
- } else if (failedNodeExceptions.isEmpty() == false) {
|
|
|
- throw convertToElastic(failedNodeExceptions.get(0));
|
|
|
- }
|
|
|
-
|
|
|
- // Either the transform doesn't exist (the user didn't create it yet) or was deleted
|
|
|
- // after the Stop API executed.
|
|
|
- // In either case, let the user know
|
|
|
- if (tasks.size() == 0) {
|
|
|
- if (taskOperationFailures.isEmpty() == false) {
|
|
|
- throw convertToElastic(taskOperationFailures.get(0).getCause());
|
|
|
- } else if (failedNodeExceptions.isEmpty() == false) {
|
|
|
- throw convertToElastic(failedNodeExceptions.get(0));
|
|
|
- } else {
|
|
|
- // This can happen we the actual task in the node no longer exists, or was never started
|
|
|
- return new StopDataFrameTransformAction.Response(true);
|
|
|
- }
|
|
|
+ if (taskOperationFailures.isEmpty() == false || failedNodeExceptions.isEmpty() == false) {
|
|
|
+ return new StopDataFrameTransformAction.Response(taskOperationFailures, failedNodeExceptions, false);
|
|
|
}
|
|
|
|
|
|
+ // if tasks is empty allMatch is 'vacuously satisfied'
|
|
|
boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped);
|
|
|
return new StopDataFrameTransformAction.Response(allStopped);
|
|
|
}
|
|
|
-
|
|
|
- static String[] dataframeNodes(List<String> dataFrameIds, ClusterState clusterState) {
|
|
|
-
|
|
|
- Set<String> executorNodes = new HashSet<>();
|
|
|
-
|
|
|
- PersistentTasksCustomMetaData tasksMetaData =
|
|
|
- PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(clusterState);
|
|
|
-
|
|
|
- if (tasksMetaData != null) {
|
|
|
- Set<String> dataFrameIdsSet = new HashSet<>(dataFrameIds);
|
|
|
-
|
|
|
- Collection<PersistentTasksCustomMetaData.PersistentTask<?>> tasks =
|
|
|
- tasksMetaData.findTasks(DataFrameField.TASK_NAME, t -> dataFrameIdsSet.contains(t.getId()));
|
|
|
-
|
|
|
- for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks) {
|
|
|
- executorNodes.add(task.getExecutorNode());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return executorNodes.toArray(new String[0]);
|
|
|
- }
|
|
|
}
|