1
0
Эх сурвалжийг харах

[Data Frame] Refactor PUT transform to not create a task (#39934)

* [Data Frame] Refactor PUT transform such that:

 * POST _start creates the task and starts it
 * GET transforms queries docs instead of tasks
 * POST _stop verifies the stored config exists before trying to stop
the task

* Addressing PR comments

* Refactoring DataFrameFeatureSet#usage, decreasing size returned getTransformConfigurations

* fixing failing usage test
Benjamin Trent 6 жил өмнө
parent
commit
56f3038979
21 өөрчлөгдсөн 883 нэмэгдсэн , 271 устгасан
  1. 6 5
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java
  2. 7 28
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java
  3. 4 3
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java
  4. 164 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java
  5. 1 1
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java
  6. 9 4
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java
  7. 9 1
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java
  8. 3 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java
  9. 33 13
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java
  10. 9 73
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java
  11. 1 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java
  12. 145 59
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java
  13. 191 59
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java
  14. 115 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java
  15. 22 8
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java
  16. 71 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java
  17. 3 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java
  18. 30 2
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java
  19. 8 11
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml
  20. 38 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml
  21. 14 1
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

+ 6 - 5
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

@@ -20,16 +20,17 @@ public class DataFrameMessages {
     public static final String REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION =
             "Failed to validate data frame configuration";
     public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
-    public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS = "Failed to deduce target mappings";
-    public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index";
-    public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK =
-            "Failed to start persistent task, configuration has been cleaned up: [{0}]";
+    public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
+    public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index";
+    public static final String REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS = "dest index [{0}] already exists";
+    public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
     public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
             "Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";
 
+    public static final String DATA_FRAME_CONFIG_INVALID = "Data frame transform configuration is invalid [{0}]";
     public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
 
-    public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]";
+    public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
     public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
             "Failed to load data frame transform configuration for transform [{0}]";
     public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION =

+ 7 - 28
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java

@@ -8,12 +8,10 @@ package org.elasticsearch.xpack.core.dataframe.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.TaskOperationFailure;
-import org.elasticsearch.action.support.tasks.BaseTasksRequest;
-import org.elasticsearch.action.support.tasks.BaseTasksResponse;
+import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.ParseField;
@@ -25,7 +23,6 @@ import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.tasks.Task;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 
@@ -52,7 +49,7 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         return new Response();
     }
 
-    public static class Request extends BaseTasksRequest<Request> implements ToXContent {
+    public static class Request extends ActionRequest implements ToXContent {
         private String id;
 
         public Request(String id) {
@@ -63,23 +60,14 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
             }
         }
 
-        private Request() {}
+        public Request() {
+        }
 
         public Request(StreamInput in) throws IOException {
             super(in);
             id = in.readString();
         }
 
-        @Override
-        public boolean match(Task task) {
-            // If we are retrieving all the transforms, the task description does not contain the id
-            if (id.equals(MetaData.ALL)) {
-                return task.getDescription().startsWith(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX);
-            }
-            // Otherwise find the task by ID
-            return task.getDescription().equals(DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + id);
-        }
-
         public String getId() {
             return id;
         }
@@ -126,7 +114,7 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         }
     }
 
-    public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
+    public static class Response extends ActionResponse implements Writeable, ToXContentObject {
 
         public static final String INVALID_TRANSFORMS_DEPRECATION_WARNING = "Found [{}] invalid transforms";
         private static final ParseField INVALID_TRANSFORMS = new ParseField("invalid_transforms");
@@ -134,22 +122,14 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         private List<DataFrameTransformConfig> transformConfigurations;
 
         public Response(List<DataFrameTransformConfig> transformConfigs) {
-            super(Collections.emptyList(), Collections.emptyList());
-            this.transformConfigurations = transformConfigs;
-        }
-
-        public Response(List<DataFrameTransformConfig> transformConfigs, List<TaskOperationFailure> taskFailures,
-                List<? extends FailedNodeException> nodeFailures) {
-            super(taskFailures, nodeFailures);
             this.transformConfigurations = transformConfigs;
         }
 
         public Response() {
-            super(Collections.emptyList(), Collections.emptyList());
+            this.transformConfigurations = Collections.emptyList();
         }
 
         public Response(StreamInput in) throws IOException {
-            super(in);
             readFrom(in);
         }
 
@@ -173,7 +153,6 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             List<String> invalidTransforms = new ArrayList<>();
             builder.startObject();
-            toXContentCommon(builder, params);
             builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size());
             // XContentBuilder does not support passing the params object for Iterables
             builder.field(DataFrameField.TRANSFORMS.getPreferredName());

+ 4 - 3
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformAction.java

@@ -9,7 +9,7 @@ package org.elasticsearch.xpack.core.dataframe.action;
 import org.elasticsearch.action.Action;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.support.tasks.BaseTasksRequest;
+import org.elasticsearch.action.support.master.AcknowledgedRequest;
 import org.elasticsearch.action.support.tasks.BaseTasksResponse;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -39,14 +39,15 @@ public class StartDataFrameTransformAction extends Action<StartDataFrameTransfor
         return new Response();
     }
 
-    public static class Request extends BaseTasksRequest<Request> implements ToXContent {
+    public static class Request extends AcknowledgedRequest<Request> implements ToXContent {
+
         private String id;
 
         public Request(String id) {
             this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
         }
 
-        private Request() {
+        public Request() {
         }
 
         public Request(StreamInput in) throws IOException {

+ 164 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/StartDataFrameTransformTaskAction.java

@@ -0,0 +1,164 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.core.dataframe.action;
+
+import org.elasticsearch.action.Action;
+import org.elasticsearch.action.ActionRequestBuilder;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.action.support.tasks.BaseTasksRequest;
+import org.elasticsearch.action.support.tasks.BaseTasksResponse;
+import org.elasticsearch.client.ElasticsearchClient;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.ToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.dataframe.DataFrameField;
+import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Objects;
+
+public class StartDataFrameTransformTaskAction extends Action<StartDataFrameTransformTaskAction.Response> {
+
+    public static final StartDataFrameTransformTaskAction INSTANCE = new StartDataFrameTransformTaskAction();
+    public static final String NAME = "cluster:admin/data_frame/start_task";
+
+    private StartDataFrameTransformTaskAction() {
+        super(NAME);
+    }
+
+    @Override
+    public Response newResponse() {
+        return new Response();
+    }
+
+    public static class Request extends BaseTasksRequest<Request> implements ToXContent {
+
+        private String id;
+
+        public Request(String id) {
+            this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
+        }
+
+        public Request() {
+        }
+
+        public Request(StreamInput in) throws IOException {
+            super(in);
+            id = in.readString();
+        }
+
+        public String getId() {
+            return id;
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeString(id);
+        }
+
+        @Override
+        public ActionRequestValidationException validate() {
+            return null;
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.field(DataFrameField.ID.getPreferredName(), id);
+            return builder;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (getClass() != obj.getClass()) {
+                return false;
+            }
+            Request other = (Request) obj;
+            return Objects.equals(id, other.id);
+        }
+    }
+
+    public static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
+
+        protected RequestBuilder(ElasticsearchClient client, StartDataFrameTransformTaskAction action) {
+            super(client, action, new Request());
+        }
+    }
+
+    public static class Response extends BaseTasksResponse implements Writeable, ToXContentObject {
+        private boolean started;
+
+        public Response() {
+            super(Collections.emptyList(), Collections.emptyList());
+        }
+
+        public Response(StreamInput in) throws IOException {
+            super(in);
+            readFrom(in);
+        }
+
+        public Response(boolean started) {
+            super(Collections.emptyList(), Collections.emptyList());
+            this.started = started;
+        }
+
+        public boolean isStarted() {
+            return started;
+        }
+
+        @Override
+        public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            started = in.readBoolean();
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            out.writeBoolean(started);
+        }
+
+        @Override
+        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+            builder.startObject();
+            toXContentCommon(builder, params);
+            builder.field("started", started);
+            builder.endObject();
+            return builder;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+
+            if (obj == null || getClass() != obj.getClass()) {
+                return false;
+            }
+            Response response = (Response) obj;
+            return started == response.started;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(started);
+        }
+    }
+}

+ 1 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java

@@ -36,7 +36,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
  */
 public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransformConfig> implements Writeable, ToXContentObject {
 
-    private static final String NAME = "data_frame_transform_config";
+    public static final String NAME = "data_frame_transform_config";
     public static final ParseField HEADERS = new ParseField("headers");
     public static final ParseField QUERY = new ParseField("query");
 

+ 9 - 4
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameRestTestCase.java

@@ -218,8 +218,11 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
 
     protected static String getDataFrameIndexerState(String transformId) throws IOException {
         Response statsResponse = client().performRequest(new Request("GET", DATAFRAME_ENDPOINT + transformId + "/_stats"));
-
-        Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
+        List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
+        if (transforms.isEmpty()) {
+            return null;
+        }
+        Map<?, ?> transformStatsAsMap = (Map<?, ?>) transforms.get(0);
         return (String) XContentMapValues.extractValue("state.transform_state", transformStatsAsMap);
     }
 
@@ -234,7 +237,6 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
 
     protected static void wipeDataFrameTransforms() throws IOException, InterruptedException {
         List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
-
         for (Map<String, Object> transformConfig : transformConfigs) {
             String transformId = (String) transformConfig.get("id");
             Request request = new Request("POST", DATAFRAME_ENDPOINT + transformId + "/_stop");
@@ -242,7 +244,10 @@ public abstract class DataFrameRestTestCase extends ESRestTestCase {
             request.addParameter("timeout", "10s");
             request.addParameter("ignore", "404");
             adminClient().performRequest(request);
-            assertEquals("stopped", getDataFrameIndexerState(transformId));
+            String state = getDataFrameIndexerState(transformId);
+            if (state != null) {
+                assertEquals("stopped", getDataFrameIndexerState(transformId));
+            }
         }
 
         for (Map<String, Object> transformConfig : transformConfigs) {

+ 9 - 1
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameUsageIT.java

@@ -35,7 +35,7 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
         indicesCreated = true;
     }
 
-    public void testUsage() throws IOException {
+    public void testUsage() throws Exception {
         Response usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
 
         Map<?, ?> usageAsMap = entityAsMap(usageResponse);
@@ -47,12 +47,20 @@ public class DataFrameUsageIT extends DataFrameRestTestCase {
 
         // create a transform
         createPivotReviewsTransform("test_usage", "pivot_reviews", null);
+        usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
+        usageAsMap = entityAsMap(usageResponse);
+        assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
+        assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.stopped", usageAsMap));
+
+        // TODO remove as soon as stats are stored in an index instead of ClusterState with the task
+        startAndWaitForTransform("test_usage", "pivot_reviews");
 
         usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
 
         usageAsMap = entityAsMap(usageResponse);
         // we should see some stats
         assertEquals(1, XContentMapValues.extractValue("data_frame.transforms._all", usageAsMap));
+        assertEquals(1, XContentMapValues.extractValue("data_frame.transforms.started", usageAsMap));
         assertEquals(0, XContentMapValues.extractValue("data_frame.stats.index_failures", usageAsMap));
     }
 }

+ 3 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrame.java

@@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStats
 import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
@@ -54,6 +55,7 @@ import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsS
 import org.elasticsearch.xpack.dataframe.action.TransportPreviewDataFrameTransformAction;
 import org.elasticsearch.xpack.dataframe.action.TransportPutDataFrameTransformAction;
 import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformAction;
+import org.elasticsearch.xpack.dataframe.action.TransportStartDataFrameTransformTaskAction;
 import org.elasticsearch.xpack.dataframe.action.TransportStopDataFrameTransformAction;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
@@ -148,6 +150,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
         return Arrays.asList(
                 new ActionHandler<>(PutDataFrameTransformAction.INSTANCE, TransportPutDataFrameTransformAction.class),
                 new ActionHandler<>(StartDataFrameTransformAction.INSTANCE, TransportStartDataFrameTransformAction.class),
+                new ActionHandler<>(StartDataFrameTransformTaskAction.INSTANCE, TransportStartDataFrameTransformTaskAction.class),
                 new ActionHandler<>(StopDataFrameTransformAction.INSTANCE, TransportStopDataFrameTransformAction.class),
                 new ActionHandler<>(DeleteDataFrameTransformAction.INSTANCE, TransportDeleteDataFrameTransformAction.class),
                 new ActionHandler<>(GetDataFrameTransformsAction.INSTANCE, TransportGetDataFrameTransformsAction.class),

+ 33 - 13
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java

@@ -17,13 +17,18 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
+import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class DataFrameFeatureSet implements XPackFeatureSet {
 
@@ -71,18 +76,33 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
             return;
         }
 
-        GetDataFrameTransformsStatsAction.Request transformStatsRequest = new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
-
-        client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest, ActionListener.wrap(transformStatsResponse -> {
-            Map<String, Long> transformsCountByState = new HashMap<>();
-            DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();
-
-            transformStatsResponse.getTransformsStateAndStats().stream().forEach(singleResult -> {
-                transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
-                accumulatedStats.merge(singleResult.getTransformStats());
-            });
-
-            listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats));
-        }, listener::onFailure));
+        GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL);
+        client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap(
+            transforms -> {
+                Set<String> transformIds = transforms.getTransformConfigurations()
+                    .stream()
+                    .map(DataFrameTransformConfig::getId)
+                    .collect(Collectors.toSet());
+                GetDataFrameTransformsStatsAction.Request transformStatsRequest =
+                    new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
+                client.execute(GetDataFrameTransformsStatsAction.INSTANCE,
+                    transformStatsRequest,
+                    ActionListener.wrap(transformStatsResponse -> {
+                        Map<String, Long> transformsCountByState = new HashMap<>();
+                        DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();
+
+                        transformStatsResponse.getTransformsStateAndStats().forEach(singleResult -> {
+                            transformIds.remove(singleResult.getId());
+                            transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
+                            accumulatedStats.merge(singleResult.getTransformStats());
+                        });
+                        // If there is no state returned, assumed stopped
+                        transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum));
+
+                    listener.onResponse(new DataFrameFeatureSetUsage(available(), enabled(), transformsCountByState, accumulatedStats));
+                }, listener::onFailure));
+            },
+            listener::onFailure
+        ));
     }
 }

+ 9 - 73
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java

@@ -7,98 +7,34 @@
 package org.elasticsearch.xpack.dataframe.action;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ActionListenerResponseHandler;
-import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.tasks.TransportTasksAction;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.discovery.MasterNotDiscoveredException;
 import org.elasticsearch.tasks.Task;
-import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Request;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response;
-import org.elasticsearch.xpack.dataframe.persistence.DataFramePersistentTaskUtils;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
-import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
 
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
 
-public class TransportGetDataFrameTransformsAction extends
-        TransportTasksAction<DataFrameTransformTask,
-        GetDataFrameTransformsAction.Request,
-        GetDataFrameTransformsAction.Response,
-        GetDataFrameTransformsAction.Response> {
+public class TransportGetDataFrameTransformsAction extends HandledTransportAction<Request, Response> {
 
     private final DataFrameTransformsConfigManager transformsConfigManager;
 
     @Inject
     public TransportGetDataFrameTransformsAction(TransportService transportService, ActionFilters actionFilters,
-            ClusterService clusterService, DataFrameTransformsConfigManager transformsConfigManager) {
-        super(GetDataFrameTransformsAction.NAME, clusterService, transportService, actionFilters, GetDataFrameTransformsAction.Request::new,
-                GetDataFrameTransformsAction.Response::new, GetDataFrameTransformsAction.Response::new, ThreadPool.Names.SAME);
+                                                 DataFrameTransformsConfigManager transformsConfigManager) {
+        super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, () -> new Request());
         this.transformsConfigManager = transformsConfigManager;
     }
 
-    @Override
-    protected Response newResponse(Request request, List<Response> tasks, List<TaskOperationFailure> taskOperationFailures,
-            List<FailedNodeException> failedNodeExceptions) {
-        List<DataFrameTransformConfig> configs = tasks.stream()
-            .flatMap(r -> r.getTransformConfigurations().stream())
-            .sorted(Comparator.comparing(DataFrameTransformConfig::getId))
-            .collect(Collectors.toList());
-        return new Response(configs, taskOperationFailures, failedNodeExceptions);
-    }
-
-    @Override
-    protected void taskOperation(Request request, DataFrameTransformTask task, ActionListener<Response> listener) {
-        assert task.getTransformId().equals(request.getId()) || request.getId().equals(MetaData.ALL);
-        // Little extra insurance, make sure we only return transforms that aren't cancelled
-        if (task.isCancelled() == false) {
-            transformsConfigManager.getTransformConfiguration(task.getTransformId(), ActionListener.wrap(config -> {
-                listener.onResponse(new Response(Collections.singletonList(config)));
-            }, e -> {
-                listener.onFailure(new RuntimeException("failed to retrieve...", e));
-            }));
-        } else {
-            listener.onResponse(new Response(Collections.emptyList()));
-        }
-    }
-
     @Override
     protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
-        final ClusterState state = clusterService.state();
-        final DiscoveryNodes nodes = state.nodes();
-
-        if (nodes.isLocalNodeElectedMaster()) {
-            if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) {
-                super.doExecute(task, request, listener);
-            } else {
-                // If we couldn't find the transform in the persistent task CS, it means it was deleted prior to this GET
-                // and we can just send an empty response, no need to go looking for the allocated task
-                listener.onResponse(new Response(Collections.emptyList()));
-            }
-
-        } else {
-            // Delegates GetTransforms to elected master node, so it becomes the coordinating node.
-            // Non-master nodes may have a stale cluster state that shows transforms which are cancelled
-            // on the master, which makes testing difficult.
-            if (nodes.getMasterNode() == null) {
-                listener.onFailure(new MasterNotDiscoveredException("no known master nodes"));
-            } else {
-                transportService.sendRequest(nodes.getMasterNode(), actionName, request,
-                        new ActionListenerResponseHandler<>(listener, Response::new));
-            }
-        }
+        //TODO support comma delimited and simple regex IDs
+        transformsConfigManager.getTransformConfigurations(request.getId(), ActionListener.wrap(
+            configs -> listener.onResponse(new Response(configs)),
+            listener::onFailure
+        ));
     }
 }

+ 1 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java

@@ -73,6 +73,7 @@ public class TransportGetDataFrameTransformsStatsAction extends
     }
 
     @Override
+    // TODO gather stats from docs when moved out of allocated task
     protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
         final ClusterState state = clusterService.state();
         final DiscoveryNodes nodes = state.nodes();

+ 145 - 59
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

@@ -8,9 +8,13 @@ package org.elasticsearch.xpack.dataframe.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.ResourceAlreadyExistsException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
@@ -18,27 +22,39 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.inject.Inject;
-import org.elasticsearch.index.query.MatchAllQueryBuilder;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
 import org.elasticsearch.persistent.PersistentTasksService;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction;
 import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request;
 import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Response;
+import org.elasticsearch.xpack.core.security.SecurityContext;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
+import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
+import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
+import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
+import org.elasticsearch.xpack.core.security.support.Exceptions;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.persistence.DataframeIndex;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -48,21 +64,23 @@ public class TransportPutDataFrameTransformAction
     private static final Logger logger = LogManager.getLogger(TransportPutDataFrameTransformAction.class);
 
     private final XPackLicenseState licenseState;
-    private final PersistentTasksService persistentTasksService;
     private final Client client;
     private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
+    private final SecurityContext securityContext;
 
     @Inject
-    public TransportPutDataFrameTransformAction(TransportService transportService, ThreadPool threadPool, ActionFilters actionFilters,
-            IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, XPackLicenseState licenseState,
-            PersistentTasksService persistentTasksService, DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
-            Client client) {
+    public TransportPutDataFrameTransformAction(Settings settings, TransportService transportService, ThreadPool threadPool,
+                                                ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
+                                                ClusterService clusterService, XPackLicenseState licenseState,
+                                                PersistentTasksService persistentTasksService,
+                                                DataFrameTransformsConfigManager dataFrameTransformsConfigManager, Client client) {
         super(PutDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
                 PutDataFrameTransformAction.Request::new);
         this.licenseState = licenseState;
-        this.persistentTasksService = persistentTasksService;
         this.client = client;
         this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
+        this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
+            new SecurityContext(settings, threadPool.getThreadContext()) : null;
     }
 
     @Override
@@ -101,62 +119,130 @@ public class TransportPutDataFrameTransformAction
             return;
         }
 
-        // create the transform, for now we only have pivot and no support for custom queries
-        Pivot pivot = new Pivot(config.getSource(), new MatchAllQueryBuilder(), config.getPivotConfig());
-
-        // the non-state creating steps are done first, so we minimize the chance to end up with orphaned state transform validation
-        pivot.validate(client, ActionListener.wrap(validationResult -> {
-            // deduce target mappings
-            pivot.deduceMappings(client, ActionListener.wrap(mappings -> {
-                // create the destination index
-                DataframeIndex.createDestinationIndex(client, config, mappings, ActionListener.wrap(createIndexResult -> {
-                    DataFrameTransform transform = createDataFrameTransform(transformId, threadPool);
-                    // create the transform configuration and store it in the internal index
-                    dataFrameTransformsConfigManager.putTransformConfiguration(config, ActionListener.wrap(r -> {
-                        // finally start the persistent task
-                        persistentTasksService.sendStartRequest(transform.getId(), DataFrameTransform.NAME, transform,
-                                ActionListener.wrap(persistentTask -> {
-                                    listener.onResponse(new PutDataFrameTransformAction.Response(true));
-                        }, startPersistentTaskException -> {
-                            // delete the otherwise orphaned transform configuration, for now we do not delete the destination index
-                            dataFrameTransformsConfigManager.deleteTransformConfiguration(transformId, ActionListener.wrap(r2 -> {
-                                        logger.debug("Deleted data frame transform [{}] configuration from data frame configuration index",
-                                                transformId);
-                                        listener.onFailure(
-                                        new RuntimeException(
-                                                DataFrameMessages.getMessage(
-                                                        DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK, r2),
-                                                startPersistentTaskException));
-                            }, deleteTransformFromIndexException -> {
-                                logger.error("Failed to cleanup orphaned data frame transform [{}] configuration", transformId);
-                                listener.onFailure(
-                                        new RuntimeException(
-                                                DataFrameMessages.getMessage(
-                                                        DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK, false),
-                                                startPersistentTaskException));
-                            }));
-                        }));
-                    }, listener::onFailure));
-                }, createDestinationIndexException -> {
-                    listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX,
-                            createDestinationIndexException));
-                }));
-            }, deduceTargetMappingsException -> {
-                listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS,
-                        deduceTargetMappingsException));
-            }));
-        }, validationException -> {
-            listener.onFailure(new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
-                    validationException));
-        }));
-    }
+        String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState,
+            IndicesOptions.lenientExpandOpen(),
+            config.getDestination());
+
+        if (dest.length > 0) {
+            listener.onFailure(new ElasticsearchStatusException(
+                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_INDEX_ALREADY_EXISTS, config.getDestination()),
+                RestStatus.BAD_REQUEST));
+            return;
+        }
+
+        String[] src = indexNameExpressionResolver.concreteIndexNames(clusterState,
+            IndicesOptions.lenientExpandOpen(),
+            config.getSource());
+        if (src.length == 0) {
+            listener.onFailure(new ElasticsearchStatusException(
+                DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, config.getSource()),
+                RestStatus.BAD_REQUEST));
+            return;
+        }
 
-    private static DataFrameTransform createDataFrameTransform(String transformId, ThreadPool threadPool) {
-        return new DataFrameTransform(transformId);
+        // Early check to verify that the user can create the destination index and can read from the source
+        if (licenseState.isAuthAllowed()) {
+            final String username = securityContext.getUser().principal();
+            RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
+                .indices(config.getSource())
+                .privileges("read")
+                .build();
+            RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
+                .indices(config.getDestination())
+                .privileges("read", "index", "create_index")
+                .build();
+
+            HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
+            privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
+            privRequest.username(username);
+            privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
+            privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
+
+            ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
+                r -> handlePrivsResponse(username, config, r, listener),
+                listener::onFailure);
+
+            client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
+        } else { // No security enabled, just create the transform
+            putDataFrame(config, listener);
+        }
     }
 
     @Override
     protected ClusterBlockException checkBlock(PutDataFrameTransformAction.Request request, ClusterState state) {
         return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
     }
+
+    private void handlePrivsResponse(String username,
+                                     DataFrameTransformConfig config,
+                                     HasPrivilegesResponse privilegesResponse,
+                                     ActionListener<Response> listener) throws IOException {
+        if (privilegesResponse.isCompleteMatch()) {
+            putDataFrame(config, listener);
+        } else {
+            XContentBuilder builder = JsonXContent.contentBuilder();
+            builder.startObject();
+            for (ResourcePrivileges index : privilegesResponse.getIndexPrivileges()) {
+                builder.field(index.getResource());
+                builder.map(index.getPrivileges());
+            }
+            builder.endObject();
+
+            listener.onFailure(Exceptions.authorizationError("Cannot create data frame transform [{}]" +
+                    " because user {} lacks permissions on the indices: {}",
+                config.getId(), username, Strings.toString(builder)));
+        }
+    }
+
+    private void putDataFrame(DataFrameTransformConfig config, ActionListener<Response> listener) {
+
+        final Pivot pivot = new Pivot(config.getSource(), config.getQueryConfig().getQuery(), config.getPivotConfig());
+
+
+        // <5> Return the listener, or clean up destination index on failure.
+        ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(
+            putTransformConfigurationResult -> listener.onResponse(new Response(true)),
+            putTransformConfigurationException ->
+                ClientHelper.executeAsyncWithOrigin(client,
+                    ClientHelper.DATA_FRAME_ORIGIN,
+                    DeleteIndexAction.INSTANCE,
+                    new DeleteIndexRequest(config.getDestination()), ActionListener.wrap(
+                        deleteIndexResponse -> listener.onFailure(putTransformConfigurationException),
+                        deleteIndexException -> {
+                            String msg = "Failed to delete destination index after creating transform [" + config.getId() + "] failed";
+                            listener.onFailure(
+                                new ElasticsearchStatusException(msg,
+                                    RestStatus.INTERNAL_SERVER_ERROR,
+                                    putTransformConfigurationException));
+                        })
+                )
+        );
+
+        // <4> Put our transform
+        ActionListener<Boolean> createDestinationIndexListener = ActionListener.wrap(
+            createIndexResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
+            createDestinationIndexException -> listener.onFailure(
+                new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX,
+                    createDestinationIndexException))
+        );
+
+        // <3> Create the destination index
+        ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(
+            mappings -> DataframeIndex.createDestinationIndex(client, config, mappings, createDestinationIndexListener),
+            deduceTargetMappingsException -> listener.onFailure(
+                new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS,
+                    deduceTargetMappingsException))
+        );
+
+        // <2> Deduce our mappings for the destination index
+        ActionListener<Boolean> pivotValidationListener = ActionListener.wrap(
+            validationResult -> pivot.deduceMappings(client, deduceMappingsListener),
+            validationException -> listener.onFailure(
+                new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
+                    validationException))
+        );
+
+        // <1> Validate our pivot
+        pivot.validate(client, pivotValidationListener);
+    }
 }

+ 191 - 59
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java

@@ -6,106 +6,238 @@
 
 package org.elasticsearch.xpack.dataframe.action;
 
-import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.FailedNodeException;
-import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.tasks.TransportTasksAction;
+import org.elasticsearch.action.support.master.TransportMasterNodeAction;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.block.ClusterBlockException;
+import org.elasticsearch.cluster.block.ClusterBlockLevel;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.license.LicenseUtils;
 import org.elasticsearch.license.XPackLicenseState;
-import org.elasticsearch.tasks.Task;
+import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
+import org.elasticsearch.persistent.PersistentTasksService;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.XPackField;
+import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
-import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 public class TransportStartDataFrameTransformAction extends
-        TransportTasksAction<DataFrameTransformTask, StartDataFrameTransformAction.Request,
-        StartDataFrameTransformAction.Response, StartDataFrameTransformAction.Response> {
+    TransportMasterNodeAction<StartDataFrameTransformAction.Request, StartDataFrameTransformAction.Response> {
 
     private final XPackLicenseState licenseState;
+    private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
+    private final PersistentTasksService persistentTasksService;
+    private final Client client;
 
     @Inject
     public TransportStartDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
-            ClusterService clusterService, XPackLicenseState licenseState) {
-        super(StartDataFrameTransformAction.NAME, clusterService, transportService, actionFilters,
-                StartDataFrameTransformAction.Request::new, StartDataFrameTransformAction.Response::new,
-                StartDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
+                                                  ClusterService clusterService, XPackLicenseState licenseState,
+                                                  ThreadPool threadPool, IndexNameExpressionResolver indexNameExpressionResolver,
+                                                  DataFrameTransformsConfigManager dataFrameTransformsConfigManager,
+                                                  PersistentTasksService persistentTasksService, Client client) {
+        super(StartDataFrameTransformAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
+            StartDataFrameTransformAction.Request::new);
         this.licenseState = licenseState;
+        this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
+        this.persistentTasksService = persistentTasksService;
+        this.client = client;
     }
 
     @Override
-    protected void processTasks(StartDataFrameTransformAction.Request request, Consumer<DataFrameTransformTask> operation) {
-        DataFrameTransformTask matchingTask = null;
-
-        // todo: re-factor, see rollup TransportTaskHelper
-        for (Task task : taskManager.getTasks().values()) {
-            if (task instanceof DataFrameTransformTask
-                    && ((DataFrameTransformTask) task).getTransformId().equals(request.getId())) {
-                if (matchingTask != null) {
-                    throw new IllegalArgumentException("Found more than one matching task for data frame transform [" + request.getId()
-                            + "] when " + "there should only be one.");
-                }
-                matchingTask = (DataFrameTransformTask) task;
-            }
-        }
-
-        if (matchingTask != null) {
-            operation.accept(matchingTask);
-        }
+    protected String executor() {
+        return ThreadPool.Names.SAME;
     }
 
     @Override
-    protected void doExecute(Task task, StartDataFrameTransformAction.Request request,
-            ActionListener<StartDataFrameTransformAction.Response> listener) {
+    protected StartDataFrameTransformAction.Response newResponse() {
+        return new StartDataFrameTransformAction.Response();
+    }
 
+    @Override
+    protected void masterOperation(StartDataFrameTransformAction.Request request,
+                                   ClusterState state,
+                                   ActionListener<StartDataFrameTransformAction.Response> listener) throws Exception {
         if (!licenseState.isDataFrameAllowed()) {
             listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
             return;
         }
+        final DataFrameTransform transformTask = createDataFrameTransform(request.getId(), threadPool);
+
+        // <3> Set the allocated task's state to STARTED
+        ActionListener<PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>> persistentTaskActionListener = ActionListener.wrap(
+            task -> {
+                waitForDataFrameTaskAllocated(task.getId(),
+                    transformTask,
+                    request.timeout(),
+                    ActionListener.wrap(
+                        taskAssigned -> ClientHelper.executeAsyncWithOrigin(client,
+                            ClientHelper.DATA_FRAME_ORIGIN,
+                            StartDataFrameTransformTaskAction.INSTANCE,
+                            new StartDataFrameTransformTaskAction.Request(request.getId()),
+                            ActionListener.wrap(
+                                r -> listener.onResponse(new StartDataFrameTransformAction.Response(true)),
+                                startingFailure -> cancelDataFrameTask(task.getId(),
+                                    transformTask.getId(),
+                                    startingFailure,
+                                    listener::onFailure)
+                                )),
+                        listener::onFailure));
+            },
+            listener::onFailure
+        );
+
+        // <2> Create the task in cluster state so that it will start executing on the node
+        ActionListener<DataFrameTransformConfig> getTransformListener = ActionListener.wrap(
+            config -> {
+                if (config.isValid() == false) {
+                    listener.onFailure(new ElasticsearchStatusException(
+                        DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_CONFIG_INVALID, request.getId()),
+                        RestStatus.BAD_REQUEST
+                    ));
+                    return;
+                }
+                PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> existingTask =
+                    getExistingTask(transformTask.getId(), state);
+                if (existingTask == null) {
+                    persistentTasksService.sendStartRequest(transformTask.getId(),
+                        DataFrameTransform.NAME,
+                        transformTask,
+                        persistentTaskActionListener);
+                } else {
+                    persistentTaskActionListener.onResponse(existingTask);
+                }
+            },
+            listener::onFailure
+        );
 
-        super.doExecute(task, request, listener);
+        // <1> Get the config to verify it exists and is valid
+        dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
     }
 
     @Override
-    protected void taskOperation(StartDataFrameTransformAction.Request request, DataFrameTransformTask transformTask,
-            ActionListener<StartDataFrameTransformAction.Response> listener) {
-        if (transformTask.getTransformId().equals(request.getId())) {
-            transformTask.start(listener);
+    protected ClusterBlockException checkBlock(StartDataFrameTransformAction.Request request, ClusterState state) {
+        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
+    }
+
+    private static DataFrameTransform createDataFrameTransform(String transformId, ThreadPool threadPool) {
+        return new DataFrameTransform(transformId);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> getExistingTask(String id, ClusterState state) {
+        PersistentTasksCustomMetaData pTasksMeta = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
+        if (pTasksMeta == null) {
+            return null;
+        }
+        Collection<PersistentTasksCustomMetaData.PersistentTask<?>> existingTask = pTasksMeta.findTasks(DataFrameTransform.NAME,
+            t -> t.getId().equals(id));
+        if (existingTask.isEmpty()) {
+            return null;
         } else {
-            listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
-                    + "] does not match request's ID [" + request.getId() + "]"));
+            assert(existingTask.size() == 1);
+            PersistentTasksCustomMetaData.PersistentTask<?> pTask = existingTask.iterator().next();
+            if (pTask.getParams() instanceof DataFrameTransform) {
+                return (PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>)pTask;
+            }
+            throw new ElasticsearchStatusException("Found data frame transform persistent task [" + id + "] with incorrect params",
+                RestStatus.INTERNAL_SERVER_ERROR);
         }
     }
 
-    @Override
-    protected StartDataFrameTransformAction.Response newResponse(StartDataFrameTransformAction.Request request,
-            List<StartDataFrameTransformAction.Response> tasks, List<TaskOperationFailure> taskOperationFailures,
-            List<FailedNodeException> failedNodeExceptions) {
-
-        if (taskOperationFailures.isEmpty() == false) {
-            throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
-        } else if (failedNodeExceptions.isEmpty() == false) {
-            throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
-        }
+    private void cancelDataFrameTask(String taskId, String dataFrameId, Exception exception, Consumer<Exception> onFailure) {
+        persistentTasksService.sendRemoveRequest(taskId,
+            new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
+                @Override
+                public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
+                    // We succeeded in cancelling the persistent task, but the
+                    // problem that caused us to cancel it is the overall result
+                    onFailure.accept(exception);
+                }
 
-        // Either the transform doesn't exist (the user didn't create it yet) or was deleted
-        // after the StartAPI executed.
-        // In either case, let the user know
-        if (tasks.size() == 0) {
-            throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found");
-        }
+                @Override
+                public void onFailure(Exception e) {
+                    logger.error("[" + dataFrameId + "] Failed to cancel persistent task that could " +
+                        "not be assigned due to [" + exception.getMessage() + "]", e);
+                    onFailure.accept(exception);
+                }
+            }
+        );
+    }
+
+    private void waitForDataFrameTaskAllocated(String taskId,
+                                               DataFrameTransform params,
+                                               TimeValue timeout,
+                                               ActionListener<Boolean> listener) {
+        DataFramePredicate predicate = new DataFramePredicate();
+        persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, timeout,
+            new PersistentTasksService.WaitForPersistentTaskListener<DataFrameTransform>() {
+                @Override
+                public void onResponse(PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform>
+                                           persistentTask) {
+                    if (predicate.exception != null) {
+                        // We want to return to the caller without leaving an unassigned persistent task
+                        cancelDataFrameTask(taskId, params.getId(), predicate.exception, listener::onFailure);
+                    } else {
+                        listener.onResponse(true);
+                    }
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    listener.onFailure(e);
+                }
+
+                @Override
+                public void onTimeout(TimeValue timeout) {
+                    listener.onFailure(new ElasticsearchException("Starting dataframe ["
+                        + params.getId() + "] timed out after [" + timeout + "]"));
+                }
+            });
+    }
 
-        assert tasks.size() == 1;
+    /**
+     * Important: the methods of this class must NOT throw exceptions.  If they did then the callers
+     * of endpoints waiting for a condition tested by this predicate would never get a response.
+     */
+    private class DataFramePredicate implements Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> {
 
-        boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformAction.Response::isStarted);
-        return new StartDataFrameTransformAction.Response(allStarted);
+        private volatile Exception exception;
+
+        @Override
+        public boolean test(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
+            if (persistentTask == null) {
+                return false;
+            }
+            PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment();
+            if (assignment != null &&
+                assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false &&
+                assignment.isAssigned() == false) {
+                // For some reason, the task is not assigned to a node, but is no longer in the `INITIAL_ASSIGNMENT` state
+                // Consider this a failure.
+                exception = new ElasticsearchStatusException("Could not start dataframe, allocation explanation [" +
+                    assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS);
+                return true;
+            }
+            // We just want it assigned so we can tell it to start working
+            return assignment != null && assignment.isAssigned();
+        }
     }
 }

+ 115 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformTaskAction.java

@@ -0,0 +1,115 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.action;
+
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.tasks.TransportTasksAction;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.license.LicenseUtils;
+import org.elasticsearch.license.XPackLicenseState;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackField;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
+import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Internal only transport class to change an allocated persistent task's state to started
+ */
+public class TransportStartDataFrameTransformTaskAction extends
+    TransportTasksAction<DataFrameTransformTask, StartDataFrameTransformTaskAction.Request,
+        StartDataFrameTransformTaskAction.Response, StartDataFrameTransformTaskAction.Response> {
+
+    private final XPackLicenseState licenseState;
+
+    @Inject
+    public TransportStartDataFrameTransformTaskAction(TransportService transportService, ActionFilters actionFilters,
+                                                  ClusterService clusterService, XPackLicenseState licenseState) {
+        super(StartDataFrameTransformTaskAction.NAME, clusterService, transportService, actionFilters,
+            StartDataFrameTransformTaskAction.Request::new, StartDataFrameTransformTaskAction.Response::new,
+            StartDataFrameTransformTaskAction.Response::new, ThreadPool.Names.SAME);
+        this.licenseState = licenseState;
+    }
+
+    @Override
+    protected void processTasks(StartDataFrameTransformTaskAction.Request request, Consumer<DataFrameTransformTask> operation) {
+        DataFrameTransformTask matchingTask = null;
+
+        // todo: re-factor, see rollup TransportTaskHelper
+        for (Task task : taskManager.getTasks().values()) {
+            if (task instanceof DataFrameTransformTask
+                && ((DataFrameTransformTask) task).getTransformId().equals(request.getId())) {
+                if (matchingTask != null) {
+                    throw new IllegalArgumentException("Found more than one matching task for data frame transform [" + request.getId()
+                        + "] when " + "there should only be one.");
+                }
+                matchingTask = (DataFrameTransformTask) task;
+            }
+        }
+
+        if (matchingTask != null) {
+            operation.accept(matchingTask);
+        }
+    }
+
+    @Override
+    protected void doExecute(Task task, StartDataFrameTransformTaskAction.Request request,
+                             ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
+
+        if (!licenseState.isDataFrameAllowed()) {
+            listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
+            return;
+        }
+
+        super.doExecute(task, request, listener);
+    }
+
+    @Override
+    protected void taskOperation(StartDataFrameTransformTaskAction.Request request, DataFrameTransformTask transformTask,
+                                 ActionListener<StartDataFrameTransformTaskAction.Response> listener) {
+        if (transformTask.getTransformId().equals(request.getId())) {
+            transformTask.start(listener);
+        } else {
+            listener.onFailure(new RuntimeException("ID of data frame transform task [" + transformTask.getTransformId()
+                + "] does not match request's ID [" + request.getId() + "]"));
+        }
+    }
+
+    @Override
+    protected StartDataFrameTransformTaskAction.Response newResponse(StartDataFrameTransformTaskAction.Request request,
+                                                                     List<StartDataFrameTransformTaskAction.Response> tasks,
+                                                                     List<TaskOperationFailure> taskOperationFailures,
+                                                                     List<FailedNodeException> failedNodeExceptions) {
+
+        if (taskOperationFailures.isEmpty() == false) {
+            throw org.elasticsearch.ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
+        } else if (failedNodeExceptions.isEmpty() == false) {
+            throw org.elasticsearch.ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
+        }
+
+        // Either the transform doesn't exist (the user didn't create it yet) or was deleted
+        // after the StartAPI executed.
+        // In either case, let the user know
+        if (tasks.size() == 0) {
+            throw new ResourceNotFoundException("Task for data frame transform [" + request.getId() + "] not found");
+        }
+
+        assert tasks.size() == 1;
+
+        boolean allStarted = tasks.stream().allMatch(StartDataFrameTransformTaskAction.Response::isStarted);
+        return new StartDataFrameTransformTaskAction.Response(allStarted);
+    }
+}

+ 22 - 8
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java

@@ -7,8 +7,6 @@ package org.elasticsearch.xpack.dataframe.action;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.ElasticsearchTimeoutException;
-import org.elasticsearch.ExceptionsHelper;
-import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.TaskOperationFailure;
@@ -22,10 +20,12 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
 
 import java.util.List;
 
+import static org.elasticsearch.ExceptionsHelper.convertToElastic;
 import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
 
 public class TransportStopDataFrameTransformAction extends
@@ -34,19 +34,26 @@ public class TransportStopDataFrameTransformAction extends
 
     private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);
     private final ThreadPool threadPool;
+    private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
 
     @Inject
     public TransportStopDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
-            ClusterService clusterService, ThreadPool threadPool) {
+                                                 ClusterService clusterService, ThreadPool threadPool,
+                                                 DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
         super(StopDataFrameTransformAction.NAME, clusterService, transportService, actionFilters, StopDataFrameTransformAction.Request::new,
                 StopDataFrameTransformAction.Response::new, StopDataFrameTransformAction.Response::new, ThreadPool.Names.SAME);
         this.threadPool = threadPool;
+        this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
     }
 
     @Override
     protected void doExecute(Task task, StopDataFrameTransformAction.Request request,
             ActionListener<StopDataFrameTransformAction.Response> listener) {
-        super.doExecute(task, request, listener);
+        // Need to verify that the config actually exists
+        dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap(
+            config -> super.doExecute(task, request, listener),
+            listener::onFailure
+        ));
     }
 
     @Override
@@ -101,16 +108,23 @@ public class TransportStopDataFrameTransformAction extends
             List<FailedNodeException> failedNodeExceptions) {
 
         if (taskOperationFailures.isEmpty() == false) {
-            throw ExceptionsHelper.convertToElastic(taskOperationFailures.get(0).getCause());
+            throw convertToElastic(taskOperationFailures.get(0).getCause());
         } else if (failedNodeExceptions.isEmpty() == false) {
-            throw ExceptionsHelper.convertToElastic(failedNodeExceptions.get(0));
+            throw convertToElastic(failedNodeExceptions.get(0));
         }
 
         // Either the transform doesn't exist (the user didn't create it yet) or was deleted
         // after the Stop API executed.
         // In either case, let the user know
         if (tasks.size() == 0) {
-            throw new ResourceNotFoundException("Task for Data Frame transform [" + request.getId() + "] not found");
+            if (taskOperationFailures.isEmpty() == false) {
+                throw convertToElastic(taskOperationFailures.get(0).getCause());
+            } else if (failedNodeExceptions.isEmpty() == false) {
+                throw convertToElastic(failedNodeExceptions.get(0));
+            } else {
+                // This can happen we the actual task in the node no longer exists, or was never started
+                return new StopDataFrameTransformAction.Response(true);
+            }
         }
 
         assert tasks.size() == 1;
@@ -118,4 +132,4 @@ public class TransportStopDataFrameTransformAction extends
         boolean allStopped = tasks.stream().allMatch(StopDataFrameTransformAction.Response::isStopped);
         return new StopDataFrameTransformAction.Response(allStopped);
     }
-}
+}

+ 71 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java

@@ -20,8 +20,12 @@ import org.elasticsearch.action.get.GetAction;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -32,14 +36,21 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@@ -47,6 +58,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 public class DataFrameTransformsConfigManager {
 
     private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class);
+    private static final int DEFAULT_SIZE = 100;
 
     public static final Map<String, String> TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true");
 
@@ -110,6 +122,53 @@ public class DataFrameTransformsConfigManager {
         }));
     }
 
+    /**
+     * Get more than one DataFrameTransformConfig
+     *
+     * @param transformId Can be a single transformId, `*`, or `_all`
+     * @param resultListener Listener to alert when request is completed
+     */
+    // TODO add pagination support
+    public void getTransformConfigurations(String transformId,
+                                           ActionListener<List<DataFrameTransformConfig>> resultListener) {
+        final boolean isAllOrWildCard = Strings.isAllOrWildcard(new String[]{transformId});
+        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
+            .filter(QueryBuilders.termQuery("doc_type", DataFrameTransformConfig.NAME));
+        if (isAllOrWildCard == false) {
+            queryBuilder.filter(QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId));
+        }
+
+        SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
+            .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
+            .setTrackTotalHits(true)
+            .setSize(DEFAULT_SIZE)
+            .setQuery(queryBuilder)
+            .request();
+
+        ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
+            ActionListener.<SearchResponse>wrap(
+                searchResponse -> {
+                    List<DataFrameTransformConfig> configs = new ArrayList<>(searchResponse.getHits().getHits().length);
+                    for (SearchHit hit : searchResponse.getHits().getHits()) {
+                        DataFrameTransformConfig config = parseTransformLenientlyFromSourceSync(hit.getSourceRef(),
+                            resultListener::onFailure);
+                        if (config == null) {
+                            return;
+                        }
+                        configs.add(config);
+                    }
+                    if (configs.isEmpty() && (isAllOrWildCard == false)) {
+                        resultListener.onFailure(new ResourceNotFoundException(
+                            DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
+                        return;
+                    }
+                    resultListener.onResponse(configs);
+                },
+                resultListener::onFailure)
+        , client::search);
+
+    }
+
     public void deleteTransformConfiguration(String transformId, ActionListener<Boolean> listener) {
         DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@@ -132,6 +191,18 @@ public class DataFrameTransformsConfigManager {
         }));
     }
 
+    private DataFrameTransformConfig parseTransformLenientlyFromSourceSync(BytesReference source, Consumer<Exception> onFailure) {
+        try (InputStream stream = source.streamInput();
+             XContentParser parser = XContentFactory.xContent(XContentType.JSON)
+                 .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
+            return DataFrameTransformConfig.fromXContent(parser, null, true);
+        } catch (Exception e) {
+            logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CONFIGURATION), e);
+            onFailure.accept(e);
+            return null;
+        }
+    }
+
     private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
             ActionListener<DataFrameTransformConfig> transformListener) {
         try (InputStream stream = source.streamInput();

+ 3 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

@@ -31,8 +31,8 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
-import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
-import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction.Response;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
+import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
 import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
 import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 
@@ -141,7 +141,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
                         (task) -> {
                             logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to ["
                                     + state.getIndexerState() + "][" + state.getPosition() + "]");
-                            listener.onResponse(new StartDataFrameTransformAction.Response(true));
+                            listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
                         }, (exc) -> {
                             // We were unable to update the persistent status, so we need to shutdown the indexer too.
                             indexer.stop();

+ 30 - 2
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java

@@ -19,11 +19,15 @@ import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xpack.core.XPackFeatureSet;
 import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
+import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -80,7 +84,21 @@ public class DataFrameFeatureSetTests extends ESTestCase {
             transformsStateAndStats.add(DataFrameTransformStateAndStatsTests.randomDataFrameTransformStateAndStats());
         }
 
+        List<DataFrameTransformConfig> transformConfigWithoutTasks = new ArrayList<>();
+        for (int i = 0; i < randomIntBetween(0, 10); ++i) {
+            transformConfigWithoutTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig());
+        }
+
+        List<DataFrameTransformConfig> transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size());
+        transformsStateAndStats.forEach(stats ->
+            transformConfigWithTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig(stats.getId())));
+
+        List<DataFrameTransformConfig> allConfigs = new ArrayList<>(transformConfigWithoutTasks.size() + transformConfigWithTasks.size());
+        allConfigs.addAll(transformConfigWithoutTasks);
+        allConfigs.addAll(transformConfigWithTasks);
+
         GetDataFrameTransformsStatsAction.Response mockResponse = new GetDataFrameTransformsStatsAction.Response(transformsStateAndStats);
+        GetDataFrameTransformsAction.Response mockTransformsResponse = new GetDataFrameTransformsAction.Response(allConfigs);
 
         doAnswer(invocationOnMock -> {
             @SuppressWarnings("unchecked")
@@ -89,6 +107,14 @@ public class DataFrameFeatureSetTests extends ESTestCase {
             return Void.TYPE;
         }).when(client).execute(same(GetDataFrameTransformsStatsAction.INSTANCE), any(), any());
 
+        doAnswer(invocationOnMock -> {
+            @SuppressWarnings("unchecked")
+            ActionListener<GetDataFrameTransformsAction.Response> listener =
+                (ActionListener<GetDataFrameTransformsAction.Response>) invocationOnMock.getArguments()[2];
+            listener.onResponse(mockTransformsResponse);
+            return Void.TYPE;
+        }).when(client).execute(same(GetDataFrameTransformsAction.INSTANCE), any(), any());
+
         PlainActionFuture<Usage> future = new PlainActionFuture<>();
         featureSet.usage(future);
         XPackFeatureSet.Usage usage = future.get();
@@ -101,16 +127,18 @@ public class DataFrameFeatureSetTests extends ESTestCase {
             Map<String, Object> usageAsMap = parser.map();
             assertTrue((boolean) XContentMapValues.extractValue("available", usageAsMap));
 
-            if (transformsStateAndStats.isEmpty()) {
+            if (transformsStateAndStats.isEmpty() && transformConfigWithoutTasks.isEmpty()) {
                 // no transforms, no stats
                 assertEquals(null, XContentMapValues.extractValue("transforms", usageAsMap));
                 assertEquals(null, XContentMapValues.extractValue("stats", usageAsMap));
             } else {
-                assertEquals(transformsStateAndStats.size(), XContentMapValues.extractValue("transforms._all", usageAsMap));
+                assertEquals(transformsStateAndStats.size() + transformConfigWithoutTasks.size(),
+                    XContentMapValues.extractValue("transforms._all", usageAsMap));
 
                 Map<String, Integer> stateCounts = new HashMap<>();
                 transformsStateAndStats.stream().map(x -> x.getTransformState().getIndexerState().value())
                         .forEach(x -> stateCounts.merge(x, 1, Integer::sum));
+                transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum));
                 stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap)));
 
                 DataFrameIndexerTransformStats combinedStats = transformsStateAndStats.stream().map(x -> x.getTransformStats())

+ 8 - 11
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

@@ -22,6 +22,13 @@ setup:
   - match: { count: 0 }
   - match: { transforms: [] }
 
+---
+"Test get transform when it does not exist":
+  - do:
+      catch: /Transform with id \[missing-transform-id\] could not be found/
+      data_frame.get_data_frame_transform:
+        transform_id: "missing-transform-id"
+
 ---
 "Test delete transform when it does not exist":
   - do:
@@ -32,7 +39,7 @@ setup:
 ---
 "Test put transform with invalid source index":
   - do:
-      catch: /Failed to validate data frame configuration/
+      catch: /Source index \[missing-index\] does not exist/
       data_frame.put_data_frame_transform:
         transform_id: "missing-source-transform"
         body: >
@@ -98,13 +105,3 @@ setup:
   - match: { count: 2 }
   - match: { transforms.0.id: "airline-transform" }
   - match: { transforms.1.id: "airline-transform-dos" }
-
-  - do:
-      data_frame.delete_data_frame_transform:
-        transform_id: "airline-transform"
-  - match: { acknowledged: true }
-
-  - do:
-      data_frame.delete_data_frame_transform:
-        transform_id: "airline-transform-dos"
-  - match: { acknowledged: true }

+ 38 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_start_stop.yml

@@ -63,6 +63,44 @@ teardown:
       data_frame.start_data_frame_transform:
         transform_id: "airline-transform-start-stop"
 
+---
+"Test start/stop/start transform":
+  - do:
+      data_frame.start_data_frame_transform:
+        transform_id: "airline-transform-start-stop"
+  - match: { started: true }
+
+  - do:
+      data_frame.get_data_frame_transform_stats:
+        transform_id: "airline-transform-start-stop"
+  - match: { count: 1 }
+  - match: { transforms.0.id: "airline-transform-start-stop" }
+  - match: { transforms.0.state.transform_state: "started" }
+
+  - do:
+      data_frame.stop_data_frame_transform:
+        transform_id: "airline-transform-start-stop"
+  - match: { stopped: true }
+
+  - do:
+      data_frame.get_data_frame_transform_stats:
+        transform_id: "airline-transform-start-stop"
+  - match: { count: 1 }
+  - match: { transforms.0.id: "airline-transform-start-stop" }
+  - match: { transforms.0.state.transform_state: "stopped" }
+
+  - do:
+      data_frame.start_data_frame_transform:
+        transform_id: "airline-transform-start-stop"
+  - match: { started: true }
+
+  - do:
+      data_frame.get_data_frame_transform_stats:
+        transform_id: "airline-transform-start-stop"
+  - match: { count: 1 }
+  - match: { transforms.0.id: "airline-transform-start-stop" }
+  - match: { transforms.0.state.transform_state: "started" }
+
 ---
 "Test stop missing transform":
   - do:

+ 14 - 1
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

@@ -25,9 +25,16 @@ setup:
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
             }
           }
+  - do:
+      data_frame.start_data_frame_transform:
+        transform_id: "airline-transform-stats"
 
 ---
 teardown:
+  - do:
+      data_frame.stop_data_frame_transform:
+        transform_id: "airline-transform-stats"
+
   - do:
       data_frame.delete_data_frame_transform:
         transform_id: "airline-transform-stats"
@@ -39,7 +46,7 @@ teardown:
         transform_id: "airline-transform-stats"
   - match: { count: 1 }
   - match: { transforms.0.id: "airline-transform-stats" }
-  - match: { transforms.0.state.transform_state: "stopped" }
+  - match: { transforms.0.state.transform_state: "started" }
   - match: { transforms.0.state.generation: 0 }
   - match: { transforms.0.stats.pages_processed: 0 }
   - match: { transforms.0.stats.documents_processed: 0 }
@@ -74,6 +81,9 @@ teardown:
               "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
             }
           }
+  - do:
+      data_frame.start_data_frame_transform:
+        transform_id: "airline-transform-stats-dos"
   - do:
       data_frame.get_data_frame_transform_stats:
         transform_id: "*"
@@ -88,6 +98,9 @@ teardown:
   - match: { transforms.0.id: "airline-transform-stats" }
   - match: { transforms.1.id: "airline-transform-stats-dos" }
 
+  - do:
+      data_frame.stop_data_frame_transform:
+        transform_id: "airline-transform-stats-dos"
   - do:
       data_frame.delete_data_frame_transform:
         transform_id: "airline-transform-stats-dos"