소스 검색

[ML] Complete the Data Frame task on stop (#41752)

Wait for indexer to stop then complete the persistent task on stop.
If the wait_for_completion is true the request will not return until stopped.
David Kyle 6 년 전
부모
커밋
9d94d57531
15개의 변경된 파일249개의 추가작업 그리고 277개의 파일을 삭제
  1. 3 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java
  2. 2 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  3. 12 75
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java
  4. 19 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java
  5. 0 22
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java
  6. 65 10
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java
  7. 3 3
      x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java
  8. 8 3
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java
  9. 38 58
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java
  10. 0 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java
  11. 0 4
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java
  12. 81 50
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java
  13. 2 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java
  14. 12 42
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java
  15. 4 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

+ 3 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -141,7 +141,8 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
     @After
     public void cleanUpTransforms() throws IOException {
         for (String transformId : transformsToClean) {
-            highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
+            highLevelClient().dataFrame().stopDataFrameTransform(
+                    new StopDataFrameTransformRequest(transformId, Boolean.TRUE, null), RequestOptions.DEFAULT);
         }
 
         for (String transformId : transformsToClean) {
@@ -265,7 +266,7 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
         assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
 
-        StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
+        StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id, Boolean.TRUE, null);
         StopDataFrameTransformResponse stopResponse =
                 execute(stopRequest, client::stopDataFrameTransform, client::stopDataFrameTransformAsync);
         assertTrue(stopResponse.isStopped());

+ 2 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -76,7 +76,8 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
     @After
     public void cleanUpTransforms() throws IOException {
         for (String transformId : transformsToClean) {
-            highLevelClient().dataFrame().stopDataFrameTransform(new StopDataFrameTransformRequest(transformId), RequestOptions.DEFAULT);
+            highLevelClient().dataFrame().stopDataFrameTransform(
+                    new StopDataFrameTransformRequest(transformId, Boolean.TRUE, TimeValue.timeValueSeconds(20)), RequestOptions.DEFAULT);
         }
 
         for (String transformId : transformsToClean) {

+ 12 - 75
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformAction.java

@@ -7,25 +7,18 @@ package org.elasticsearch.xpack.core.dataframe.action;
 
 import org.elasticsearch.action.Action;
 import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.TaskOperationFailure;
-import org.elasticsearch.action.support.tasks.BaseTasksRequest;
-import org.elasticsearch.action.support.tasks.BaseTasksResponse;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.MasterNodeRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
-import org.elasticsearch.common.xcontent.ToXContentObject;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.tasks.Task;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.utils.ExceptionsHelper;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
 import java.util.Objects;
 
-public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransformAction.Response> {
+public class DeleteDataFrameTransformAction extends Action<AcknowledgedResponse> {
 
     public static final DeleteDataFrameTransformAction INSTANCE = new DeleteDataFrameTransformAction();
     public static final String NAME = "cluster:admin/data_frame/delete";
@@ -35,17 +28,21 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
     }
 
     @Override
-    public Response newResponse() {
+    public AcknowledgedResponse newResponse() {
         throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
     }
 
     @Override
-    public Writeable.Reader<Response> getResponseReader() {
-        return Response::new;
+    public Writeable.Reader<AcknowledgedResponse> getResponseReader() {
+        return in -> {
+            AcknowledgedResponse response = new AcknowledgedResponse();
+            response.readFrom(in);
+            return response;
+        };
     }
 
-    public static class Request extends BaseTasksRequest<Request> {
-        private final String id;
+    public static class Request extends MasterNodeRequest<Request> {
+        private String id;
 
         public Request(String id) {
             this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
@@ -60,11 +57,6 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
             return id;
         }
 
-        @Override
-        public boolean match(Task task) {
-            return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
-        }
-
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
@@ -94,59 +86,4 @@ public class DeleteDataFrameTransformAction extends Action<DeleteDataFrameTransf
             return Objects.equals(id, other.id);
         }
     }
-
-    public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
-
-        private final boolean acknowledged;
-
-        public Response(StreamInput in) throws IOException {
-            super(in);
-            acknowledged = in.readBoolean();
-        }
-
-        public Response(boolean acknowledged, List<TaskOperationFailure> taskFailures, List<FailedNodeException> nodeFailures) {
-            super(taskFailures, nodeFailures);
-            this.acknowledged = acknowledged;
-        }
-
-        public Response(boolean acknowledged) {
-            this(acknowledged, Collections.emptyList(), Collections.emptyList());
-        }
-
-        public boolean isDeleted() {
-            return acknowledged;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeBoolean(acknowledged);
-        }
-
-        @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.startObject();
-            {
-                toXContentCommon(builder, params);
-                builder.field("acknowledged", acknowledged);
-            }
-            builder.endObject();
-            return builder;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-            DeleteDataFrameTransformAction.Response response = (DeleteDataFrameTransformAction.Response) o;
-            return super.equals(o) && acknowledged == response.acknowledged;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(super.hashCode(), acknowledged);
-        }
-    }
 }

+ 19 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

@@ -22,9 +22,11 @@ import java.util.concurrent.atomic.AtomicReference;
  * An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
  * it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
  * Only one background job can run simultaneously and {@link #onFinish} is called when the job
- * finishes. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()} is called if the indexer is
- * aborted while a job is running. The indexer must be started ({@link #start()} to allow a background job to run when
- * {@link #maybeTriggerAsyncJob(long)} is called. {@link #stop()} can be used to stop the background job without aborting the indexer.
+ * finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
+ * to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
+ * is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
+ * to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
+ * {@link #stop()} can be used to stop the background job without aborting the indexer.
  *
  * In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
  * indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
@@ -84,8 +86,10 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
 
     /**
      * Sets the internal state to {@link IndexerState#STOPPING} if an async job is
-     * running in the background. If there is no job running when this function is
-     * called, the state is directly set to {@link IndexerState#STOPPED}.
+     * running in the background, {@link #onStop()} will be called when the background job
+     * detects that the indexer is stopped.
+     * If there is no job running when this function is called
+     * the state is set to {@link IndexerState#STOPPED} and {@link #onStop()} called directly.
      *
      * @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
      */
@@ -94,6 +98,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
             if (previousState == IndexerState.INDEXING) {
                 return IndexerState.STOPPING;
             } else if (previousState == IndexerState.STARTED) {
+                onStop();
                 return IndexerState.STOPPED;
             } else {
                 return previousState;
@@ -251,6 +256,14 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
      */
     protected abstract void onFinish(ActionListener<Void> listener);
 
+    /**
+     * Called when the indexer is stopped. This is only called when the indexer is stopped
+     * via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
+     * when the indexer's work is done.
+     */
+    protected void onStop() {
+    }
+
     /**
      * Called when a background job detects that the indexer is aborted causing the
      * async execution to stop.
@@ -276,6 +289,7 @@ public abstract class AsyncTwoPhaseIndexer<JobPosition, JobStats extends Indexer
 
             case STOPPING:
                 // must be started again
+                onStop();
                 return IndexerState.STOPPED;
 
             case ABORTING:

+ 0 - 22
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/action/DeleteDataFrameTransformActionResponseTests.java

@@ -1,22 +0,0 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
-package org.elasticsearch.xpack.core.dataframe.action;
-
-import org.elasticsearch.common.io.stream.Writeable.Reader;
-import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response;
-
-public class DeleteDataFrameTransformActionResponseTests extends AbstractWireSerializingDataFrameTestCase<Response> {
-    @Override
-    protected Response createTestInstance() {
-        return new Response(randomBoolean());
-    }
-
-    @Override
-    protected Reader<Response> instanceReader() {
-        return Response::new;
-    }
-}

+ 65 - 10
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

@@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.test.ESTestCase;
+import org.junit.Before;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -34,17 +35,26 @@ import static org.hamcrest.Matchers.equalTo;
 public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
     AtomicBoolean isFinished = new AtomicBoolean(false);
+    AtomicBoolean isStopped = new AtomicBoolean(false);
+
+    @Before
+    public void reset() {
+        isFinished.set(false);
+        isStopped.set(false);
+    }
 
     private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
 
         private final CountDownLatch latch;
         // test the execution order
         private volatile int step;
+        private final boolean stoppedBeforeFinished;
 
         protected MockIndexer(Executor executor, AtomicReference<IndexerState> initialState, Integer initialPosition,
-                              CountDownLatch latch) {
+                              CountDownLatch latch, boolean stoppedBeforeFinished) {
             super(executor, initialState, initialPosition, new MockJobStats());
             this.latch = latch;
+            this.stoppedBeforeFinished = stoppedBeforeFinished;
         }
 
         @Override
@@ -57,7 +67,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             awaitForLatch();
             assertThat(step, equalTo(3));
             ++step;
-            return new IterationResult<Integer>(Collections.emptyList(), 3, true);
+            return new IterationResult<>(Collections.emptyList(), 3, true);
         }
 
         private void awaitForLatch() {
@@ -99,7 +109,8 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
 
         @Override
         protected void doSaveState(IndexerState state, Integer position, Runnable next) {
-            assertThat(step, equalTo(5));
+            int expectedStep = stoppedBeforeFinished ? 3 : 5;
+            assertThat(step, equalTo(expectedStep));
             ++step;
             next.run();
         }
@@ -114,7 +125,12 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             assertThat(step, equalTo(4));
             ++step;
             listener.onResponse(null);
-            isFinished.set(true);
+            assertTrue(isFinished.compareAndSet(false, true));
+        }
+
+        @Override
+        protected void onStop() {
+            assertTrue(isStopped.compareAndSet(false, true));
         }
 
         @Override
@@ -180,7 +196,7 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
         protected void onFailure(Exception exc) {
             assertThat(step, equalTo(2));
             ++step;
-            isFinished.set(true);
+            assertTrue(isFinished.compareAndSet(false, true));
         }
 
         @Override
@@ -209,10 +225,9 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
     public void testStateMachine() throws Exception {
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
         final ExecutorService executor = Executors.newFixedThreadPool(1);
-        isFinished.set(false);
         try {
             CountDownLatch countDownLatch = new CountDownLatch(1);
-            MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch);
+            MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
             indexer.start();
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
@@ -220,7 +235,8 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
             countDownLatch.countDown();
 
             assertThat(indexer.getPosition(), equalTo(2));
-            ESTestCase.awaitBusy(() -> isFinished.get());
+            assertTrue(awaitBusy(() -> isFinished.get()));
+            assertFalse(isStopped.get());
             assertThat(indexer.getStep(), equalTo(6));
             assertThat(indexer.getStats().getNumInvocations(), equalTo(1L));
             assertThat(indexer.getStats().getNumPages(), equalTo(1L));
@@ -234,18 +250,57 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
     public void testStateMachineBrokenSearch() throws InterruptedException {
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
         final ExecutorService executor = Executors.newFixedThreadPool(1);
-        isFinished.set(false);
         try {
 
             MockIndexerThrowsFirstSearch indexer = new MockIndexerThrowsFirstSearch(executor, state, 2);
             indexer.start();
             assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
             assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
-            assertTrue(ESTestCase.awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
+            assertTrue(awaitBusy(() -> isFinished.get(), 10000, TimeUnit.SECONDS));
             assertThat(indexer.getStep(), equalTo(3));
 
         } finally {
             executor.shutdownNow();
         }
     }
+
+    public void testStop_AfterIndexerIsFinished() throws InterruptedException {
+        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        try {
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
+            indexer.start();
+            assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+            countDownLatch.countDown();
+            assertTrue(awaitBusy(() -> isFinished.get()));
+
+            indexer.stop();
+            assertTrue(isStopped.get());
+            assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    public void testStop_WhileIndexing() throws InterruptedException {
+        AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
+        final ExecutorService executor = Executors.newFixedThreadPool(1);
+        try {
+            CountDownLatch countDownLatch = new CountDownLatch(1);
+            MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, true);
+            indexer.start();
+            assertThat(indexer.getState(), equalTo(IndexerState.STARTED));
+            assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
+            assertThat(indexer.getState(), equalTo(IndexerState.INDEXING));
+            indexer.stop();
+            countDownLatch.countDown();
+
+            assertThat(indexer.getPosition(), equalTo(2));
+            assertTrue(awaitBusy(() -> isStopped.get()));
+            assertFalse(isFinished.get());
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }

+ 3 - 3
x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

@@ -93,11 +93,11 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
             new StartDataFrameTransformAction.Request(id, false)).actionGet();
     }
 
-    protected DeleteDataFrameTransformAction.Response deleteDataFrameTransform(String id) {
-        DeleteDataFrameTransformAction.Response response = client().execute(DeleteDataFrameTransformAction.INSTANCE,
+    protected AcknowledgedResponse deleteDataFrameTransform(String id) {
+        AcknowledgedResponse response = client().execute(DeleteDataFrameTransformAction.INSTANCE,
             new DeleteDataFrameTransformAction.Request(id))
             .actionGet();
-        if (response.isDeleted()) {
+        if (response.isAcknowledged()) {
             transformConfigs.remove(id);
         }
         return response;

+ 8 - 3
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -21,6 +21,7 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
+import org.junit.After;
 import org.junit.AfterClass;
 
 import java.io.IOException;
@@ -272,16 +273,20 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
         adminClient().performRequest(request);
     }
 
-    @AfterClass
-    public static void removeIndices() throws Exception {
+    @After
+    public void waitForDataFrame() throws Exception {
         wipeDataFrameTransforms();
         waitForPendingDataFrameTasks();
+    }
+
+    @AfterClass
+    public static void removeIndices() throws Exception {
         // we might have disabled wiping indices, but now its time to get rid of them
         // note: can not use super.cleanUpCluster() as this method must be static
         wipeIndices();
     }
 
-    protected static void wipeDataFrameTransforms() throws IOException, InterruptedException {
+    public void wipeDataFrameTransforms() throws IOException, InterruptedException {
         List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
         for (Map<String, Object> transformConfig : transformConfigs) {
             String transformId = (String) transformConfig.get("id");

+ 38 - 58
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportDeleteDataFrameTransformAction.java

@@ -5,93 +5,73 @@
  */
 package org.elasticsearch.xpack.dataframe.action;
 
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionListenerResponseHandler;
-import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.tasks.TransportTasksAction;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.discovery.MasterNotDiscoveredException;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
-import org.elasticsearch.tasks.Task;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Request;
-import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction.Response;
-import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
-import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
 
-import java.util.List;
+import java.io.IOException;
 
-public class TransportDeleteDataFrameTransformAction extends TransportTasksAction<DataFrameTransformTask, Request, Response, Response> {
+public class TransportDeleteDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
 
     private final DataFrameTransformsConfigManager transformsConfigManager;
 
     @Inject
-    public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
-            ClusterService clusterService, DataFrameTransformsConfigManager transformsConfigManager) {
-        super(DeleteDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
-                Response::new, ThreadPool.Names.SAME);
+    public TransportDeleteDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters, ThreadPool threadPool,
+                                                   ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
+                                                   DataFrameTransformsConfigManager transformsConfigManager) {
+        super(DeleteDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters,
+                Request::new, indexNameExpressionResolver);
         this.transformsConfigManager = transformsConfigManager;
     }
 
     @Override
-    protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
-            List<FailedNodeException> failedNodeExceptions) {
-        assert tasks.size() + taskOperationFailures.size() == 1;
-        boolean cancelled = tasks.size() > 0 && tasks.stream().allMatch(Response::isDeleted);
+    protected String executor() {
+        return ThreadPool.Names.SAME;
+    }
+
+    @Override
+    protected AcknowledgedResponse newResponse() {
+        return new AcknowledgedResponse();
+    }
 
-        return new Response(cancelled, taskOperationFailures, failedNodeExceptions);
+    protected AcknowledgedResponse read(StreamInput in) throws IOException {
+        AcknowledgedResponse response = new AcknowledgedResponse();
+        response.readFrom(in);
+        return response;
     }
 
     @Override
-    protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener<Response> listener) {
-        assert task.getTransformId().equals(request.getId());
-        IndexerState state = task.getState().getIndexerState();
-        if (state.equals(IndexerState.STOPPED)) {
-            task.onCancelled();
-            transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
-                listener.onResponse(new Response(true));
-            }, listener::onFailure));
+    protected void masterOperation(Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) throws Exception {
+        PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+        if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
+            listener.onFailure(new ElasticsearchStatusException("Cannot delete data frame [" + request.getId() +
+                    "] as the task is running. Stop the task first", RestStatus.CONFLICT));
         } else {
-            listener.onFailure(new IllegalStateException("Could not delete transform [" + request.getId() + "] because "
-                    + "indexer state is [" + state + "].  Transform must be [" + IndexerState.STOPPED + "] before deletion."));
+            // Task is not running, delete the configuration document
+            transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(
+                    r -> listener.onResponse(new AcknowledgedResponse(r)),
+                    listener::onFailure));
         }
     }
 
     @Override
-    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
-        final ClusterState state = clusterService.state();
-        final DiscoveryNodes nodes = state.nodes();
-        if (nodes.isLocalNodeElectedMaster()) {
-            PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
-            if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
-                super.doExecute(task, request, listener);
-            } else {
-                // we couldn't find the transform in the persistent task CS, but maybe the transform exists in the configuration index,
-                // if so delete the orphaned document and do not throw (for the normal case we want to stop the task first,
-                // than delete the configuration document if and only if the data frame transform is in stopped state)
-                transformsConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
-                    listener.onResponse(new Response(true));
-                    return;
-                }, listener::onFailure));
-            }
-        } else {
-            // Delegates DeleteTransform to elected master node, so it becomes the coordinating node.
-            // Non-master nodes may have a stale cluster state that shows transforms which are cancelled
-            // on the master, which makes testing difficult.
-            if (nodes.getMasterNode() == null) {
-                listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
-            } else {
-                transportService.sendRequest(nodes.getMasterNode(), actionName, request,
-                        new ActionListenerResponseHandler<>(listener, Response::new));
-            }
-        }
+    protected ClusterBlockException checkBlock(Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
     }
 }

+ 0 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java

@@ -132,7 +132,6 @@ public class TransportGetDataFrameTransformsStatsAction extends
             },
             e -> {
                 // If the index to search, or the individual config is not there, just return empty
-                logger.error("failed to expand ids", e);
                 if (e instanceof ResourceNotFoundException) {
                     finalListener.onResponse(new Response(Collections.emptyList()));
                 } else {

+ 0 - 4
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

@@ -6,8 +6,6 @@
 
 package org.elasticsearch.xpack.dataframe.action;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.ActionListener;
@@ -63,8 +61,6 @@ import java.util.stream.Collectors;
 public class TransportPutDataFrameTransformAction
         extends TransportMasterNodeAction<PutDataFrameTransformAction.Request, AcknowledgedResponse> {
 
-    private static final Logger logger = LogManager.getLogger(TransportPutDataFrameTransformAction.class);
-
     private final XPackLicenseState licenseState;
     private final Client client;
     private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;

+ 81 - 50
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java

@@ -5,64 +5,85 @@
  */
 package org.elasticsearch.xpack.dataframe.action;
 
-import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchStatusException;
-import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.tasks.TransportTasksAction;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.discovery.MasterNotDiscoveredException;
+import org.elasticsearch.persistent.PersistentTasksService;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.action.util.PageParams;
-import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformTaskState;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
 
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
-
 public class TransportStopDataFrameTransformAction extends
         TransportTasksAction<DataFrameTransformTask, StopDataFrameTransformAction.Request,
         StopDataFrameTransformAction.Response, StopDataFrameTransformAction.Response> {
 
-    private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
     private final ThreadPool threadPool;
     private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
+    private final PersistentTasksService persistentTasksService;
 
     @Inject
     public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
                                                  ClusterService clusterService, ThreadPool threadPool,
+                                                 PersistentTasksService persistentTasksService,
                                                  DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
         super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new,
                 StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
         this.threadPool = threadPool;
         this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
+        this.persistentTasksService = persistentTasksService;
     }
 
     @Override
     protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
             ActionListener<StopDataFrameTransformAction.Response> listener) {
+        final ClusterState state = clusterService.state();
+        final DiscoveryNodes nodes = state.nodes();
+        if (nodes.isLocalNodeElectedMaster() == false) {
+            // Delegates stop data frame to elected master node so it becomes the coordinating node.
+            if (nodes.getMasterNode() == null) {
+                listener.onFailure(new MasterNotDiscoveredException("no known master node"));
+            } else {
+                transportService.sendRequest(nodes.getMasterNode(), actionName, request,
+                        new ActionListenerResponseHandler<>(listener, StopDataFrameTransformAction.Response::new));
+            }
+        } else {
+            final ActionListener<StopDataFrameTransformAction.Response> finalListener;
+            if (request.waitForCompletion()) {
+                finalListener = waitForStopListener(request, listener);
+            } else {
+                finalListener = listener;
+            }
 
-        dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
-                expandedIds -> {
-                    request.setExpandedIds(new HashSet<>(expandedIds));
-                    request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state()));
-                    super.doExecute(task, request, listener);
-                },
-                listener::onFailure
-        ));
+            dataFrameTransformsConfigManager.expandTransformIds(request.getId(), new PageParams(0, 10_000), ActionListener.wrap(
+                    expandedIds -> {
+                        request.setExpandedIds(new HashSet<>(expandedIds));
+                        request.setNodes(DataFrameNodes.dataFrameTaskNodes(expandedIds, clusterService.state()));
+                        super.doExecute(task, request, finalListener);
+                    },
+                    listener::onFailure
+            ));
+        }
     }
 
     @Override
@@ -84,42 +105,9 @@ public class TransportStopDataFrameTransformAction extends
                         RestStatus.CONFLICT));
                 return;
             }
-            if (request.waitForCompletion() == false) {
-                transformTask.stop(listener);
-            } else {
-                ActionListener<StopDataFrameTransformAction.Response> blockingListener = ActionListener.wrap(response -> {
-                    if (response.isStopped()) {
-                        // The Task acknowledged that it is stopped/stopping... wait until the status actually
-                        // changes over before returning. Switch over to Generic threadpool so
-                        // we don't block the network thread
-                        threadPool.generic().execute(() -> {
-                            try {
-                                long untilInNanos = System.nanoTime() + request.getTimeout().getNanos();
-
-                                while (System.nanoTime() - untilInNanos < 0) {
-                                    if (transformTask.isStopped()) {
-                                        listener.onResponse(response);
-                                        return;
-                                    }
-                                    Thread.sleep(WAIT_FOR_COMPLETION_POLL.millis());
-                                }
-                                // ran out of time
-                                listener.onFailure(new ElasticsearchTimeoutException(
-                                        DataFrameMessages.getMessage(DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT,
-                                                request.getTimeout().getStringRep(), request.getId())));
-                            } catch (InterruptedException e) {
-                                listener.onFailure(new ElasticsearchException(DataFrameMessages.getMessage(
-                                        DataFrameMessages.REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT, request.getId()), e));
-                            }
-                        });
-                    } else {
-                        // Did not acknowledge stop, just return the response
-                        listener.onResponse(response);
-                    }
-                }, listener::onFailure);
-
-                transformTask.stop(blockingListener);
-            }
+
+            transformTask.stop();
+            listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
         } else {
             listener.onFailure(new RuntimeException("ID of data frame indexer task [" + transformTask.getTransformId()
                     + "] does not match request's ID [" + request.getId() + "]"));
@@ -139,4 +127,47 @@ public class TransportStopDataFrameTransformAction extends
         boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped);
         return new StopDataFrameTransformAction.Response(allStopped);
     }
+
+    private ActionListener<StopDataFrameTransformAction.Response>
+    waitForStopListener(StopDataFrameTransformAction.Request request,
+                        ActionListener<StopDataFrameTransformAction.Response> listener) {
+
+        return ActionListener.wrap(
+                response -> {
+                    // Wait until the persistent task is stopped
+                    // Switch over to Generic threadpool so we don't block the network thread
+                    threadPool.generic().execute(() ->
+                        waitForDataFrameStopped(request.getExpandedIds(), request.getTimeout(), listener));
+                },
+                listener::onFailure
+        );
+    }
+
+    private void waitForDataFrameStopped(Collection<String> persistentTaskIds, TimeValue timeout,
+                                         ActionListener<StopDataFrameTransformAction.Response> listener) {
+        persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> {
+
+            if (persistentTasksCustomMetaData == null) {
+                return true;
+            }
+
+            for (String persistentTaskId : persistentTaskIds) {
+                if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) {
+                    return false;
+                }
+            }
+            return true;
+
+        }, timeout, new ActionListener<>() {
+            @Override
+            public void onResponse(Boolean result) {
+                listener.onResponse(new StopDataFrameTransformAction.Response(Boolean.TRUE));
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                listener.onFailure(e);
+            }
+        });
+    }
 }

+ 2 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestDeleteDataFrameTransformAction.java

@@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
 
@@ -33,7 +34,7 @@ public class RestDeleteDataFrameTransformAction extends BaseRestHandler {
         DeleteDataFrameTransformAction.Request request = new DeleteDataFrameTransformAction.Request(id);
 
         return channel -> client.execute(DeleteDataFrameTransformAction.INSTANCE, request,
-                new BaseTasksResponseToXContentListener<>(channel));
+                new RestToXContentListener<>(channel));
     }
 
     @Override

+ 12 - 42
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -27,7 +27,6 @@ import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
-import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@@ -85,7 +84,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         String initialReason = null;
         long initialGeneration = 0;
         Map<String, Object> initialPosition = null;
-        logger.info("[{}] init, got state: [{}]", transform.getId(), state != null);
+        logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null);
         if (state != null) {
             initialTaskState = state.getTaskState();
             initialReason = state.getReason();
@@ -218,51 +217,17 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
         ));
     }
 
-    public synchronized void stop(ActionListener<StopDataFrameTransformAction.Response> listener) {
+    public synchronized void stop() {
         if (getIndexer() == null) {
-            listener.onFailure(new ElasticsearchException("Task for transform [{}] not fully initialized. Try again later",
-                getTransformId()));
             return;
         }
         // taskState is initialized as STOPPED and is updated in tandem with the indexerState
         // Consequently, if it is STOPPED, we consider the whole task STOPPED.
         if (taskState.get() == DataFrameTransformTaskState.STOPPED) {
-            listener.onResponse(new StopDataFrameTransformAction.Response(true));
             return;
         }
-        final IndexerState newState = getIndexer().stop();
-        switch (newState) {
-        case STOPPED:
-            // Fall through to `STOPPING` as the behavior is the same for both, we should persist for both
-        case STOPPING:
-            // update the persistent state to STOPPED. There are two scenarios and both are safe:
-            // 1. we persist STOPPED now, indexer continues a bit then sees the flag and checkpoints another STOPPED with the more recent
-            // position.
-            // 2. we persist STOPPED now, indexer continues a bit but then dies. When/if we resume we'll pick up at last checkpoint,
-            // overwrite some docs and eventually checkpoint.
-            taskState.set(DataFrameTransformTaskState.STOPPED);
-            DataFrameTransformState state = new DataFrameTransformState(
-                DataFrameTransformTaskState.STOPPED,
-                IndexerState.STOPPED,
-                getIndexer().getPosition(),
-                currentCheckpoint.get(),
-                stateReason.get(),
-                getIndexer().getProgress());
-            persistStateToClusterState(state, ActionListener.wrap(
-                task -> {
-                    auditor.info(transform.getId(), "Updated state to [" + state.getTaskState() + "]");
-                    listener.onResponse(new StopDataFrameTransformAction.Response(true));
-                },
-                exc -> listener.onFailure(new ElasticsearchException(
-                    "Error while updating state for data frame transform [{}] to [{}]", exc,
-                    transform.getId(),
-                    state.getIndexerState()))));
-            break;
-        default:
-            listener.onFailure(new ElasticsearchException("Cannot stop task for data frame transform [{}], because state was [{}]",
-                    transform.getId(), newState));
-            break;
-        }
+
+        getIndexer().stop();
     }
 
     @Override
@@ -280,12 +245,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
 
     /**
      * Attempt to gracefully cleanup the data frame transform so it can be terminated.
-     * This tries to remove the job from the scheduler, and potentially any other
-     * cleanup operations in the future
+     * This tries to remove the job from the scheduler and completes the persistent task
      */
     synchronized void shutdown() {
         try {
-            logger.info("Data frame indexer [" + transform.getId() + "] received abort request, stopping indexer.");
             schedulerEngine.remove(SCHEDULE_NAME + "_" + transform.getId());
             schedulerEngine.unregister(this);
         } catch (Exception e) {
@@ -612,6 +575,13 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
             }
         }
 
+        @Override
+        protected void onStop() {
+            auditor.info(transformConfig.getId(), "Indexer has stopped");
+            logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
+            transformTask.shutdown();
+        }
+
         @Override
         protected void onAbort() {
             auditor.info(transformConfig.getId(), "Received abort request, stopping indexer");

+ 4 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -106,6 +106,7 @@ teardown:
   - do:
       data_frame.stop_data_frame_transform:
         transform_id: "airline-transform-start-stop"
+        wait_for_completion: true
   - match: { stopped: true }
 
   - do:
@@ -199,6 +200,7 @@ teardown:
   - do:
       data_frame.stop_data_frame_transform:
         transform_id: "airline-transform-start-later"
+        wait_for_completion: true
   - match: { stopped: true }
 
   - do:
@@ -232,6 +234,8 @@ teardown:
   - do:
       data_frame.stop_data_frame_transform:
         transform_id: "_all"
+        wait_for_completion: true
+
   - match: { stopped: true }
 
   - do: