|
@@ -5,103 +5,101 @@
|
|
|
*/
|
|
|
package org.elasticsearch.xpack.rollup.action;
|
|
|
|
|
|
-import org.elasticsearch.ElasticsearchException;
|
|
|
+import org.elasticsearch.ResourceNotFoundException;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.ActionListenerResponseHandler;
|
|
|
+import org.elasticsearch.action.FailedNodeException;
|
|
|
+import org.elasticsearch.action.TaskOperationFailure;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
|
|
-import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
|
|
+import org.elasticsearch.action.support.tasks.TransportTasksAction;
|
|
|
import org.elasticsearch.cluster.ClusterState;
|
|
|
-import org.elasticsearch.cluster.block.ClusterBlockException;
|
|
|
-import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|
|
-import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
+import org.elasticsearch.common.io.stream.StreamInput;
|
|
|
import org.elasticsearch.common.settings.Settings;
|
|
|
-import org.elasticsearch.common.unit.TimeValue;
|
|
|
+import org.elasticsearch.discovery.MasterNotDiscoveredException;
|
|
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
|
|
-import org.elasticsearch.persistent.PersistentTasksService;
|
|
|
+import org.elasticsearch.tasks.Task;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
+import org.elasticsearch.xpack.core.indexing.IndexerState;
|
|
|
import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction;
|
|
|
-import org.elasticsearch.xpack.core.rollup.job.RollupJob;
|
|
|
+import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus;
|
|
|
+import org.elasticsearch.xpack.rollup.job.RollupJobTask;
|
|
|
|
|
|
-import java.util.Objects;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.List;
|
|
|
|
|
|
-public class TransportDeleteRollupJobAction
|
|
|
- extends TransportMasterNodeAction<DeleteRollupJobAction.Request, AcknowledgedResponse> {
|
|
|
-
|
|
|
- private final PersistentTasksService persistentTasksService;
|
|
|
+public class TransportDeleteRollupJobAction extends TransportTasksAction<RollupJobTask, DeleteRollupJobAction.Request,
|
|
|
+ DeleteRollupJobAction.Response, DeleteRollupJobAction.Response> {
|
|
|
|
|
|
@Inject
|
|
|
- public TransportDeleteRollupJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
|
|
|
- ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
|
|
- PersistentTasksService persistentTasksService, ClusterService clusterService) {
|
|
|
- super(settings, DeleteRollupJobAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
|
|
- indexNameExpressionResolver, DeleteRollupJobAction.Request::new);
|
|
|
- this.persistentTasksService = persistentTasksService;
|
|
|
+ public TransportDeleteRollupJobAction(Settings settings, TransportService transportService,
|
|
|
+ ActionFilters actionFilters, ClusterService clusterService) {
|
|
|
+ super(settings, DeleteRollupJobAction.NAME, clusterService, transportService, actionFilters,
|
|
|
+ DeleteRollupJobAction.Request::new, DeleteRollupJobAction.Response::new, ThreadPool.Names.SAME);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected String executor() {
|
|
|
- return ThreadPool.Names.SAME;
|
|
|
+ protected void doExecute(Task task, DeleteRollupJobAction.Request request, ActionListener<DeleteRollupJobAction.Response> listener) {
|
|
|
+ final ClusterState state = clusterService.state();
|
|
|
+ final DiscoveryNodes nodes = state.nodes();
|
|
|
+
|
|
|
+ if (nodes.isLocalNodeElectedMaster()) {
|
|
|
+ PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
|
|
|
+ if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
|
|
|
+ super.doExecute(task, request, listener);
|
|
|
+ } else {
|
|
|
+ // If we couldn't find the job in the persistent task CS, it means it was deleted prior to this call,
|
|
|
+ // no need to go looking for the allocated task
|
|
|
+ listener.onFailure(new ResourceNotFoundException("the task with id [" + request.getId() + "] doesn't exist"));
|
|
|
+ }
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // Delegates DeleteJob to elected master node, so it becomes the coordinating node.
|
|
|
+ // Non-master nodes may have a stale cluster state that shows jobs which are cancelled
|
|
|
+ // on the master, which makes testing difficult.
|
|
|
+ if (nodes.getMasterNode() == null) {
|
|
|
+ listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
|
|
|
+ } else {
|
|
|
+ transportService.sendRequest(nodes.getMasterNode(), actionName, request,
|
|
|
+ new ActionListenerResponseHandler<>(listener, DeleteRollupJobAction.Response::new));
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected AcknowledgedResponse newResponse() {
|
|
|
- return new AcknowledgedResponse();
|
|
|
+ protected void taskOperation(DeleteRollupJobAction.Request request, RollupJobTask jobTask,
|
|
|
+ ActionListener<DeleteRollupJobAction.Response> listener) {
|
|
|
+
|
|
|
+ assert jobTask.getConfig().getId().equals(request.getId());
|
|
|
+ IndexerState state = ((RollupJobStatus) jobTask.getStatus()).getIndexerState();
|
|
|
+ if (state.equals(IndexerState.STOPPED) ) {
|
|
|
+ jobTask.onCancelled();
|
|
|
+ listener.onResponse(new DeleteRollupJobAction.Response(true));
|
|
|
+ } else {
|
|
|
+ listener.onFailure(new IllegalStateException("Could not delete job [" + request.getId() + "] because " +
|
|
|
+ "indexer state is [" + state + "]. Job must be [" + IndexerState.STOPPED + "] before deletion."));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void masterOperation(DeleteRollupJobAction.Request request, ClusterState state,
|
|
|
- ActionListener<AcknowledgedResponse> listener) throws Exception {
|
|
|
-
|
|
|
- String jobId = request.getId();
|
|
|
- TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option
|
|
|
-
|
|
|
- // Step 1. Cancel the persistent task
|
|
|
- persistentTasksService.sendRemoveRequest(jobId, new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
|
|
|
- @Override
|
|
|
- public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
|
|
|
- logger.debug("Request to cancel Task for Rollup job [" + jobId + "] successful.");
|
|
|
-
|
|
|
- // Step 2. Wait for the task to finish cancellation internally
|
|
|
- persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout,
|
|
|
- new PersistentTasksService.WaitForPersistentTaskListener<RollupJob>() {
|
|
|
- @Override
|
|
|
- public void onResponse(PersistentTasksCustomMetaData.PersistentTask<RollupJob> task) {
|
|
|
- logger.debug("Task for Rollup job [" + jobId + "] successfully canceled.");
|
|
|
- listener.onResponse(new AcknowledgedResponse(true));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- logger.error("Error while cancelling task for Rollup job [" + jobId
|
|
|
- + "]." + e);
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onTimeout(TimeValue timeout) {
|
|
|
- String msg = "Stopping of Rollup job [" + jobId + "] timed out after [" + timeout + "].";
|
|
|
- logger.warn(msg);
|
|
|
- listener.onFailure(new ElasticsearchException(msg));
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(Exception e) {
|
|
|
- logger.error("Error while requesting to cancel task for Rollup job [" + jobId
|
|
|
- + "]" + e);
|
|
|
- listener.onFailure(e);
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
+ protected DeleteRollupJobAction.Response newResponse(DeleteRollupJobAction.Request request, List<DeleteRollupJobAction.Response> tasks,
|
|
|
+ List<TaskOperationFailure> taskOperationFailures,
|
|
|
+ List<FailedNodeException> failedNodeExceptions) {
|
|
|
+ // There should theoretically only be one task running the rollup job
|
|
|
+ // If there are more, in production it should be ok as long as they are acknowledge shutting down.
|
|
|
+ // But in testing we'd like to know there were more than one hence the assert
|
|
|
+ assert tasks.size() + taskOperationFailures.size() == 1;
|
|
|
+ boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(DeleteRollupJobAction.Response::isDeleted);
|
|
|
+ return new DeleteRollupJobAction.Response(cancelled, taskOperationFailures, failedNodeExceptions);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected ClusterBlockException checkBlock(DeleteRollupJobAction.Request request, ClusterState state) {
|
|
|
- return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
|
|
+ protected DeleteRollupJobAction.Response readTaskResponse(StreamInput in) throws IOException {
|
|
|
+ DeleteRollupJobAction.Response response = new DeleteRollupJobAction.Response();
|
|
|
+ response.readFrom(in);
|
|
|
+ return response;
|
|
|
}
|
|
|
}
|