Browse Source

[ML] Refactor GET Transforms API (#40015)

* [Data Frame] Refactor GET Transforms API:

* Add pagination
* comma delimited list expression support GET transforms
* Flag troublesome internal code for future refactor

* Removing `allow_no_transforms` param, ratcheting down pageparam option

* Changing  DataFrameFeatureSet#usage to not get all configs

* Intermediate commit

* Writing test for batch data gatherer

* Removing unused import

* removing bad println used for debugging

* Updating BatchedDataIterator comments and query

* addressing pr comments

* disallow null scrollId to cause stackoverflow
Benjamin Trent 6 years ago
parent
commit
70615dd38d
14 changed files with 1037 additions and 211 deletions
  1. 30 80
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java
  2. 7 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java
  3. 9 28
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java
  4. 59 10
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java
  5. 142 5
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java
  6. 2 73
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java
  7. 9 1
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsAction.java
  8. 186 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIterator.java
  9. 125 0
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/TypedChainTaskExecutor.java
  10. 12 12
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java
  11. 339 0
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIteratorTests.java
  12. 14 2
      x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.get_data_frame_transform.json
  13. 38 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml
  14. 65 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

+ 30 - 80
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/GetDataFrameTransformsAction.java

@@ -8,29 +8,28 @@ package org.elasticsearch.xpack.core.dataframe.action;
 
 import org.apache.logging.log4j.LogManager;
 import org.elasticsearch.action.Action;
-import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.ActionRequestValidationException;
-import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.client.ElasticsearchClient;
-import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.common.ParseField;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.common.xcontent.ToXContent;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.xpack.core.action.AbstractGetResourcesRequest;
+import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
+import org.elasticsearch.xpack.core.action.util.PageParams;
+import org.elasticsearch.xpack.core.action.util.QueryPage;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
+
+import static org.elasticsearch.action.ValidateActions.addValidationError;
 
 public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsAction.Response>{
 
@@ -49,61 +48,45 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         return new Response();
     }
 
-    public static class Request extends ActionRequest implements ToXContent {
-        private String id;
+    public static class Request extends AbstractGetResourcesRequest implements ToXContent {
+
+        private static final int MAX_SIZE_RETURN = 1000;
 
         public Request(String id) {
-            if (Strings.isNullOrEmpty(id) || id.equals("*")) {
-                this.id = MetaData.ALL;
-            } else {
-                this.id = id;
-            }
+            super(id, PageParams.defaultParams(), true);
         }
 
         public Request() {
+            super(null, PageParams.defaultParams(), true);
         }
 
         public Request(StreamInput in) throws IOException {
-            super(in);
-            id = in.readString();
+            readFrom(in);
         }
 
         public String getId() {
-            return id;
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeString(id);
+            return getResourceId();
         }
 
         @Override
         public ActionRequestValidationException validate() {
-            return null;
+            ActionRequestValidationException exception = null;
+            if (getPageParams() != null && getPageParams().getSize() > MAX_SIZE_RETURN) {
+                exception = addValidationError("Param [" + PageParams.SIZE.getPreferredName() +
+                    "] has a max acceptable value of [" + MAX_SIZE_RETURN + "]", exception);
+            }
+            return exception;
         }
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            builder.field(DataFrameField.ID.getPreferredName(), id);
+            builder.field(DataFrameField.ID.getPreferredName(), getResourceId());
             return builder;
         }
 
         @Override
-        public int hashCode() {
-            return Objects.hash(id);
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (obj == null) {
-                return false;
-            }
-            if (getClass() != obj.getClass()) {
-                return false;
-            }
-            Request other = (Request) obj;
-            return Objects.equals(id, other.id);
+        public String getResourceIdField() {
+            return DataFrameField.ID.getPreferredName();
         }
     }
 
@@ -114,19 +97,17 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         }
     }
 
-    public static class Response extends ActionResponse implements Writeable, ToXContentObject {
+    public static class Response extends AbstractGetResourcesResponse<DataFrameTransformConfig> implements Writeable, ToXContentObject {
 
         public static final String INVALID_TRANSFORMS_DEPRECATION_WARNING = "Found [{}] invalid transforms";
         private static final ParseField INVALID_TRANSFORMS = new ParseField("invalid_transforms");
 
-        private List<DataFrameTransformConfig> transformConfigurations;
-
         public Response(List<DataFrameTransformConfig> transformConfigs) {
-            this.transformConfigurations = transformConfigs;
+            super(new QueryPage<>(transformConfigs, transformConfigs.size(), DataFrameField.TRANSFORMS));
         }
 
         public Response() {
-            this.transformConfigurations = Collections.emptyList();
+            super();
         }
 
         public Response(StreamInput in) throws IOException {
@@ -134,30 +115,18 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         }
 
         public List<DataFrameTransformConfig> getTransformConfigurations() {
-            return transformConfigurations;
-        }
-
-        @Override
-        public void readFrom(StreamInput in) throws IOException {
-            super.readFrom(in);
-            transformConfigurations = in.readList(DataFrameTransformConfig::new);
-        }
-
-        @Override
-        public void writeTo(StreamOutput out) throws IOException {
-            super.writeTo(out);
-            out.writeList(transformConfigurations);
+            return getResources().results();
         }
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             List<String> invalidTransforms = new ArrayList<>();
             builder.startObject();
-            builder.field(DataFrameField.COUNT.getPreferredName(), transformConfigurations.size());
+            builder.field(DataFrameField.COUNT.getPreferredName(), getResources().count());
             // XContentBuilder does not support passing the params object for Iterables
             builder.field(DataFrameField.TRANSFORMS.getPreferredName());
             builder.startArray();
-            for (DataFrameTransformConfig configResponse : transformConfigurations) {
+            for (DataFrameTransformConfig configResponse : getResources().results()) {
                 configResponse.toXContent(builder, params);
                 if (configResponse.isValid() == false) {
                     invalidTransforms.add(configResponse.getId());
@@ -177,27 +146,8 @@ public class GetDataFrameTransformsAction extends Action<GetDataFrameTransformsA
         }
 
         @Override
-        public int hashCode() {
-            return Objects.hash(transformConfigurations);
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            if (this == other) {
-                return true;
-            }
-
-            if (other == null || getClass() != other.getClass()) {
-                return false;
-            }
-
-            final Response that = (Response) other;
-            return Objects.equals(this.transformConfigurations, that.transformConfigurations);
-        }
-
-        @Override
-        public final String toString() {
-            return Strings.toString(this);
+        protected Reader<DataFrameTransformConfig> getReader() {
+            return DataFrameTransformConfig::new;
         }
     }
 }

+ 7 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformStateAndStats.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
+import org.elasticsearch.xpack.core.indexing.IndexerState;
 
 import java.io.IOException;
 import java.util.Objects;
@@ -38,6 +39,12 @@ public class DataFrameTransformStateAndStats implements Writeable, ToXContentObj
                 DataFrameField.STATS_FIELD);
     }
 
+    public static DataFrameTransformStateAndStats initialStateAndStats(String id) {
+        return new DataFrameTransformStateAndStats(id,
+            new DataFrameTransformState(IndexerState.STOPPED, null, 0),
+            new DataFrameIndexerTransformStats());
+    }
+
     public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
         this.id = Objects.requireNonNull(id);
         this.transformState = Objects.requireNonNull(state);

+ 9 - 28
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSet.java

@@ -17,20 +17,16 @@ import org.elasticsearch.xpack.core.XPackFeatureSet;
 import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.XPackSettings;
 import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
-import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
-import org.elasticsearch.xpack.core.indexing.IndexerState;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
+
 
 public class DataFrameFeatureSet implements XPackFeatureSet {
 
@@ -78,39 +74,24 @@ public class DataFrameFeatureSet implements XPackFeatureSet {
             return;
         }
 
-        GetDataFrameTransformsAction.Request transformsRequest = new GetDataFrameTransformsAction.Request(MetaData.ALL);
-        client.execute(GetDataFrameTransformsAction.INSTANCE, transformsRequest, ActionListener.wrap(
-                transforms -> {
-                    GetDataFrameTransformsStatsAction.Request transformStatsRequest =
-                            new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
-                    client.execute(GetDataFrameTransformsStatsAction.INSTANCE, transformStatsRequest,
-                            ActionListener.wrap(transformStatsResponse -> {
-                                listener.onResponse(createUsage(available(), enabled(), transforms.getTransformConfigurations(),
-                                        transformStatsResponse.getTransformsStateAndStats()));
-                            }, listener::onFailure));
-                },
-                listener::onFailure
-        ));
+        final GetDataFrameTransformsStatsAction.Request transformStatsRequest = new GetDataFrameTransformsStatsAction.Request(MetaData.ALL);
+        client.execute(GetDataFrameTransformsStatsAction.INSTANCE,
+            transformStatsRequest,
+            ActionListener.wrap(transformStatsResponse ->
+                listener.onResponse(createUsage(available(), enabled(), transformStatsResponse.getTransformsStateAndStats())),
+                listener::onFailure));
     }
 
-    static DataFrameFeatureSetUsage createUsage(boolean available, boolean enabled,
-                                                List<DataFrameTransformConfig> transforms,
+    static DataFrameFeatureSetUsage createUsage(boolean available,
+                                                boolean enabled,
                                                 List<DataFrameTransformStateAndStats> transformsStateAndStats) {
 
-        Set<String> transformIds = transforms.stream()
-                .map(DataFrameTransformConfig::getId)
-                .collect(Collectors.toSet());
-
         Map<String, Long> transformsCountByState = new HashMap<>();
         DataFrameIndexerTransformStats accumulatedStats = new DataFrameIndexerTransformStats();
-
         transformsStateAndStats.forEach(singleResult -> {
-            transformIds.remove(singleResult.getId());
             transformsCountByState.merge(singleResult.getTransformState().getIndexerState().value(), 1L, Long::sum);
             accumulatedStats.merge(singleResult.getTransformStats());
         });
-        // If there is no state returned, assumed stopped
-        transformIds.forEach(ignored -> transformsCountByState.merge(IndexerState.STOPPED.value(), 1L, Long::sum));
 
         return new DataFrameFeatureSetUsage(available, enabled, transformsCountByState, accumulatedStats);
     }

+ 59 - 10
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsAction.java

@@ -6,35 +6,84 @@
 
 package org.elasticsearch.xpack.dataframe.action;
 
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
-import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.ClientHelper;
+import org.elasticsearch.xpack.core.action.AbstractTransportGetResourcesAction;
+import org.elasticsearch.xpack.core.dataframe.DataFrameField;
+import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Request;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction.Response;
-import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
 
+import java.io.IOException;
 
-public class TransportGetDataFrameTransformsAction extends HandledTransportAction<Request, Response> {
+import static org.elasticsearch.xpack.core.dataframe.DataFrameField.INDEX_DOC_TYPE;
 
-    private final DataFrameTransformsConfigManager transformsConfigManager;
+
+public class TransportGetDataFrameTransformsAction extends AbstractTransportGetResourcesAction<DataFrameTransformConfig,
+                                                                                               Request,
+                                                                                               Response> {
 
     @Inject
     public TransportGetDataFrameTransformsAction(TransportService transportService, ActionFilters actionFilters,
-                                                 DataFrameTransformsConfigManager transformsConfigManager) {
-        super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, () -> new Request());
-        this.transformsConfigManager = transformsConfigManager;
+                                                 Client client, NamedXContentRegistry xContentRegistry) {
+        super(GetDataFrameTransformsAction.NAME, transportService, actionFilters, Request::new, client, xContentRegistry);
     }
 
     @Override
     protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
-        //TODO support comma delimited and simple regex IDs
-        transformsConfigManager.getTransformConfigurations(request.getId(), ActionListener.wrap(
-            configs -> listener.onResponse(new Response(configs)),
+        searchResources(request, ActionListener.wrap(
+            r -> listener.onResponse(new Response(r.results())),
             listener::onFailure
         ));
     }
+
+    @Override
+    protected ParseField getResultsField() {
+        return DataFrameField.TRANSFORMS;
+    }
+
+    @Override
+    protected String[] getIndices() {
+        return new String[]{DataFrameInternalIndex.INDEX_NAME};
+    }
+
+    @Override
+    protected DataFrameTransformConfig parse(XContentParser parser) throws IOException {
+        return DataFrameTransformConfig.fromXContent(parser, null, true);
+    }
+
+    @Override
+    protected ResourceNotFoundException notFoundException(String resourceId) {
+        return new ResourceNotFoundException(
+            DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, resourceId));
+    }
+
+    @Override
+    protected String executionOrigin() {
+        return ClientHelper.DATA_FRAME_ORIGIN;
+    }
+
+    @Override
+    protected String extractIdFromResource(DataFrameTransformConfig transformConfig) {
+        return transformConfig.getId();
+    }
+
+    @Override
+    protected QueryBuilder additionalQuery() {
+        return QueryBuilders.termQuery(INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME);
+    }
 }

+ 142 - 5
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportGetDataFrameTransformsStatsAction.java

@@ -6,31 +6,54 @@
 
 package org.elasticsearch.xpack.dataframe.action;
 
+import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.TaskOperationFailure;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.tasks.TransportTasksAction;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.discovery.MasterNotDiscoveredException;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.sort.SortOrder;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Request;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction.Response;
+import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
 import org.elasticsearch.xpack.dataframe.persistence.DataFramePersistentTaskUtils;
+import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
 import org.elasticsearch.xpack.dataframe.transforms.DataFrameTransformTask;
+import org.elasticsearch.xpack.dataframe.util.BatchedDataIterator;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class TransportGetDataFrameTransformsStatsAction extends
@@ -39,11 +62,16 @@ public class TransportGetDataFrameTransformsStatsAction extends
         GetDataFrameTransformsStatsAction.Response,
         GetDataFrameTransformsStatsAction.Response> {
 
+    private final Client client;
+    private final DataFrameTransformsConfigManager dataFrameTransformsConfigManager;
     @Inject
     public TransportGetDataFrameTransformsStatsAction(TransportService transportService, ActionFilters actionFilters,
-            ClusterService clusterService) {
+                                                      ClusterService clusterService, Client client,
+                                                      DataFrameTransformsConfigManager dataFrameTransformsConfigManager) {
         super(GetDataFrameTransformsStatsAction.NAME, clusterService, transportService, actionFilters, Request::new, Response::new,
                 Response::new, ThreadPool.Names.SAME);
+        this.client = client;
+        this.dataFrameTransformsConfigManager = dataFrameTransformsConfigManager;
     }
 
     @Override
@@ -80,11 +108,14 @@ public class TransportGetDataFrameTransformsStatsAction extends
 
         if (nodes.isLocalNodeElectedMaster()) {
             if (DataFramePersistentTaskUtils.stateHasDataFrameTransforms(request.getId(), state)) {
-                super.doExecute(task, request, listener);
+                ActionListener<Response> transformStatsListener = ActionListener.wrap(
+                    response -> collectStatsForTransformsWithoutTasks(request, response, listener),
+                    listener::onFailure
+                );
+                super.doExecute(task, request, transformStatsListener);
             } else {
-                // If we couldn't find the transform in the persistent task CS, it means it was deleted prior to this GET
-                // and we can just send an empty response, no need to go looking for the allocated task
-                listener.onResponse(new Response(Collections.emptyList()));
+                // If we don't have any tasks, pass empty collection to this method
+                collectStatsForTransformsWithoutTasks(request, new Response(Collections.emptyList()), listener);
             }
 
         } else {
@@ -99,4 +130,110 @@ public class TransportGetDataFrameTransformsStatsAction extends
             }
         }
     }
+
+    // TODO correct when we start storing stats in docs, right now, just return STOPPED and empty stats
+    private void collectStatsForTransformsWithoutTasks(Request request,
+                                                       Response response,
+                                                       ActionListener<Response> listener) {
+        if (request.getId().equals(MetaData.ALL) == false) {
+            // If we did not find any tasks && this is NOT for ALL, verify that the single config exists, and return as stopped
+            // Empty other wise
+            if (response.getTransformsStateAndStats().isEmpty()) {
+                dataFrameTransformsConfigManager.getTransformConfiguration(request.getId(), ActionListener.wrap(
+                    config ->
+                        listener.onResponse(
+                            new Response(Collections.singletonList(DataFrameTransformStateAndStats.initialStateAndStats(config.getId())))),
+                    exception -> {
+                        if (exception instanceof ResourceNotFoundException) {
+                            listener.onResponse(new Response(Collections.emptyList()));
+                        } else {
+                            listener.onFailure(exception);
+                        }
+                    }
+                ));
+            } else {
+                // If it was not ALL && we DO have stored stats, simply return those as we found them all, since we only support 1 or all
+                listener.onResponse(response);
+            }
+            return;
+        }
+        // We only do this mass collection if we are getting ALL tasks
+        TransformIdCollector collector = new TransformIdCollector();
+        collector.execute(ActionListener.wrap(
+            allIds -> {
+                response.getTransformsStateAndStats().forEach(
+                    tsas -> allIds.remove(tsas.getId())
+                );
+                List<DataFrameTransformStateAndStats> statsWithoutTasks = allIds.stream()
+                    .map(DataFrameTransformStateAndStats::initialStateAndStats)
+                    .collect(Collectors.toList());
+                statsWithoutTasks.addAll(response.getTransformsStateAndStats());
+                statsWithoutTasks.sort(Comparator.comparing(DataFrameTransformStateAndStats::getId));
+                listener.onResponse(new Response(statsWithoutTasks));
+            },
+            listener::onFailure
+        ));
+    }
+
+    /**
+     * This class recursively queries a scroll search over all transform_ids and puts them in a set
+     */
+    private class TransformIdCollector extends BatchedDataIterator<String, Set<String>> {
+
+        private final Set<String> ids = new HashSet<>();
+        TransformIdCollector() {
+            super(client, DataFrameInternalIndex.INDEX_NAME);
+        }
+
+        void execute(final ActionListener<Set<String>> finalListener) {
+            if (this.hasNext()) {
+                next(ActionListener.wrap(
+                    setOfIds -> execute(finalListener),
+                    finalListener::onFailure
+                ));
+            } else {
+                finalListener.onResponse(ids);
+            }
+        }
+
+        @Override
+        protected QueryBuilder getQuery() {
+            return QueryBuilders.boolQuery()
+                .filter(QueryBuilders.termQuery(DataFrameField.INDEX_DOC_TYPE.getPreferredName(), DataFrameTransformConfig.NAME));
+        }
+
+        @Override
+        protected String map(SearchHit hit) {
+            BytesReference source = hit.getSourceRef();
+            try (InputStream stream = source.streamInput();
+                 XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY,
+                     LoggingDeprecationHandler.INSTANCE, stream)) {
+                return (String)parser.map().get(DataFrameField.ID.getPreferredName());
+            } catch (IOException e) {
+                throw new ElasticsearchParseException("failed to parse bucket", e);
+            }
+        }
+
+        @Override
+        protected Set<String> getCollection() {
+            return ids;
+        }
+
+        @Override
+        protected SortOrder sortOrder() {
+            return SortOrder.ASC;
+        }
+
+        @Override
+        protected String sortField() {
+            return DataFrameField.ID.getPreferredName();
+        }
+
+        @Override
+        protected FetchSourceContext getFetchSourceContext() {
+            return new FetchSourceContext(true, new String[]{DataFrameField.ID.getPreferredName()}, new String[]{});
+        }
+    }
+
+
 }

+ 2 - 73
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/persistence/DataFrameTransformsConfigManager.java

@@ -20,12 +20,8 @@ import org.elasticsearch.action.get.GetAction;
 import org.elasticsearch.action.get.GetRequest;
 import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -36,22 +32,14 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.sort.SortOrder;
-import org.elasticsearch.xpack.core.ClientHelper;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
 
 import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
 import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
@@ -59,7 +47,6 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
 public class DataFrameTransformsConfigManager {
 
     private static final Logger logger = LogManager.getLogger(DataFrameTransformsConfigManager.class);
-    private static final int DEFAULT_SIZE = 100;
 
     public static final Map<String, String> TO_XCONTENT_PARAMS = Collections.singletonMap(DataFrameField.FOR_INTERNAL_STORAGE, "true");
 
@@ -102,6 +89,8 @@ public class DataFrameTransformsConfigManager {
         }
     }
 
+    // For transforms returned via GET data_frame/transforms, see the TransportGetDataFrameTransformsAction
+    // This function is only for internal use.
     public void getTransformConfiguration(String transformId, ActionListener<DataFrameTransformConfig> resultListener) {
         GetRequest getRequest = new GetRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
         executeAsyncWithOrigin(client, DATA_FRAME_ORIGIN, GetAction.INSTANCE, getRequest, ActionListener.wrap(getResponse -> {
@@ -123,54 +112,6 @@ public class DataFrameTransformsConfigManager {
         }));
     }
 
-    /**
-     * Get more than one DataFrameTransformConfig
-     *
-     * @param transformId Can be a single transformId, `*`, or `_all`
-     * @param resultListener Listener to alert when request is completed
-     */
-    // TODO add pagination support
-    public void getTransformConfigurations(String transformId,
-                                           ActionListener<List<DataFrameTransformConfig>> resultListener) {
-        final boolean isAllOrWildCard = Strings.isAllOrWildcard(new String[]{transformId});
-        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
-            .filter(QueryBuilders.termQuery("doc_type", DataFrameTransformConfig.NAME));
-        if (isAllOrWildCard == false) {
-            queryBuilder.filter(QueryBuilders.termQuery(DataFrameField.ID.getPreferredName(), transformId));
-        }
-
-        SearchRequest request = client.prepareSearch(DataFrameInternalIndex.INDEX_NAME)
-            .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
-            .setTrackTotalHits(true)
-            .addSort(DataFrameField.ID.getPreferredName(), SortOrder.ASC)
-            .setSize(DEFAULT_SIZE)
-            .setQuery(queryBuilder)
-            .request();
-
-        ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
-            ActionListener.<SearchResponse>wrap(
-                searchResponse -> {
-                    List<DataFrameTransformConfig> configs = new ArrayList<>(searchResponse.getHits().getHits().length);
-                    for (SearchHit hit : searchResponse.getHits().getHits()) {
-                        DataFrameTransformConfig config = parseTransformLenientlyFromSourceSync(hit.getSourceRef(),
-                            resultListener::onFailure);
-                        if (config == null) {
-                            return;
-                        }
-                        configs.add(config);
-                    }
-                    if (configs.isEmpty() && (isAllOrWildCard == false)) {
-                        resultListener.onFailure(new ResourceNotFoundException(
-                            DataFrameMessages.getMessage(DataFrameMessages.REST_DATA_FRAME_UNKNOWN_TRANSFORM, transformId)));
-                        return;
-                    }
-                    resultListener.onResponse(configs);
-                },
-                resultListener::onFailure)
-        , client::search);
-
-    }
-
     public void deleteTransformConfiguration(String transformId, ActionListener<Boolean> listener) {
         DeleteRequest request = new DeleteRequest(DataFrameInternalIndex.INDEX_NAME, DataFrameTransformConfig.documentId(transformId));
         request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
@@ -193,18 +134,6 @@ public class DataFrameTransformsConfigManager {
         }));
     }
 
-    private DataFrameTransformConfig parseTransformLenientlyFromSourceSync(BytesReference source, Consumer<Exception> onFailure) {
-        try (InputStream stream = source.streamInput();
-             XContentParser parser = XContentFactory.xContent(XContentType.JSON)
-                 .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)) {
-            return DataFrameTransformConfig.fromXContent(parser, null, true);
-        } catch (Exception e) {
-            logger.error(DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_PARSE_TRANSFORM_CONFIGURATION), e);
-            onFailure.accept(e);
-            return null;
-        }
-    }
-
     private void parseTransformLenientlyFromSource(BytesReference source, String transformId,
             ActionListener<DataFrameTransformConfig> transformListener) {
         try (InputStream stream = source.streamInput();

+ 9 - 1
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestGetDataFrameTransformsAction.java

@@ -12,6 +12,7 @@ import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.action.util.PageParams;
 import org.elasticsearch.xpack.core.dataframe.DataFrameField;
 import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
 
@@ -25,8 +26,15 @@ public class RestGetDataFrameTransformsAction extends BaseRestHandler {
 
     @Override
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
+        GetDataFrameTransformsAction.Request request = new GetDataFrameTransformsAction.Request();
+
         String id = restRequest.param(DataFrameField.ID.getPreferredName());
-        GetDataFrameTransformsAction.Request request = new GetDataFrameTransformsAction.Request(id);
+        request.setResourceId(id);
+        if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
+            request.setPageParams(
+                new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
+                    restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)));
+        }
         return channel -> client.execute(GetDataFrameTransformsAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 

+ 186 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIterator.java

@@ -0,0 +1,186 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.dataframe.util;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.ClearScrollResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.xpack.core.ClientHelper;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Provides basic tools around scrolling over documents and gathering the data in some Collection
+ * @param <T> The object type that is being collected
+ * @param <E> The collection that should be used (i.e. Set, Deque, etc.)
+ */
+public abstract class BatchedDataIterator<T, E extends Collection<T>> {
+    private static final Logger LOGGER = LogManager.getLogger(BatchedDataIterator.class);
+
+    private static final String CONTEXT_ALIVE_DURATION = "5m";
+    private static final int BATCH_SIZE = 10_000;
+
+    private final Client client;
+    private final String index;
+    private volatile long count;
+    private volatile long totalHits;
+    private volatile String scrollId;
+    private volatile boolean isScrollInitialised;
+
+    protected BatchedDataIterator(Client client, String index) {
+        this.client = Objects.requireNonNull(client);
+        this.index = Objects.requireNonNull(index);
+        this.totalHits = 0;
+        this.count = 0;
+    }
+
+    /**
+     * Returns {@code true} if the iteration has more elements.
+     * (In other words, returns {@code true} if {@link #next} would
+     * return an element rather than throwing an exception.)
+     *
+     * @return {@code true} if the iteration has more elements
+     */
+    public boolean hasNext() {
+        return !isScrollInitialised || count != totalHits;
+    }
+
+    /**
+     * The first time next() is called, the search will be performed and the first
+     * batch will be given to the listener. Any subsequent call will return the following batches.
+     * <p>
+     * Note that in some implementations it is possible that when there are no
+     * results at all. {@link BatchedDataIterator#hasNext()} will return {@code true} the first time it is called but then a call
+     * to this function returns an empty Collection to the listener.
+     */
+    public void next(ActionListener<E> listener) {
+        if (!hasNext()) {
+            listener.onFailure(new NoSuchElementException());
+        }
+
+        if (!isScrollInitialised) {
+            ActionListener<SearchResponse> wrappedListener = ActionListener.wrap(
+                searchResponse -> {
+                    isScrollInitialised = true;
+                    totalHits = searchResponse.getHits().getTotalHits().value;
+                    scrollId = searchResponse.getScrollId();
+                    mapHits(searchResponse, listener);
+                },
+                listener::onFailure
+            );
+            initScroll(wrappedListener);
+        } else {
+            ActionListener<SearchResponse> wrappedListener = ActionListener.wrap(
+                searchResponse -> {
+                    scrollId = searchResponse.getScrollId();
+                    mapHits(searchResponse, listener);
+                },
+                listener::onFailure
+            );
+            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(CONTEXT_ALIVE_DURATION);
+            ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+                ClientHelper.DATA_FRAME_ORIGIN,
+                searchScrollRequest,
+                wrappedListener,
+                client::searchScroll);
+        }
+    }
+
+    private void initScroll(ActionListener<SearchResponse> listener) {
+        LOGGER.trace("ES API CALL: search index {}", index);
+
+        SearchRequest searchRequest = new SearchRequest(index);
+        searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
+        searchRequest.scroll(CONTEXT_ALIVE_DURATION);
+        searchRequest.source(new SearchSourceBuilder()
+            .fetchSource(getFetchSourceContext())
+            .size(getBatchSize())
+            .query(getQuery())
+            .trackTotalHits(true)
+            .sort(sortField(), sortOrder()));
+
+        ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+            ClientHelper.DATA_FRAME_ORIGIN,
+            searchRequest,
+            listener,
+            client::search);
+    }
+
+    private void mapHits(SearchResponse searchResponse, ActionListener<E> mappingListener) {
+        E results = getCollection();
+
+        SearchHit[] hits = searchResponse.getHits().getHits();
+        for (SearchHit hit : hits) {
+            T mapped = map(hit);
+            if (mapped != null) {
+                results.add(mapped);
+            }
+        }
+        count += hits.length;
+
+        if (!hasNext() && scrollId != null) {
+            ClearScrollRequest request = client.prepareClearScroll().setScrollIds(Collections.singletonList(scrollId)).request();
+            ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+                ClientHelper.DATA_FRAME_ORIGIN,
+                request,
+                ActionListener.<ClearScrollResponse>wrap(
+                    r -> mappingListener.onResponse(results),
+                    mappingListener::onFailure
+                ),
+                client::clearScroll);
+        } else {
+            mappingListener.onResponse(results);
+        }
+    }
+
+    /**
+     * Get the query to use for the search
+     * @return the search query
+     */
+    protected abstract QueryBuilder getQuery();
+
+    /**
+     * Maps the search hit to the document type
+     * @param hit the search hit
+     * @return The mapped document or {@code null} if the mapping failed
+     */
+    protected abstract T map(SearchHit hit);
+
+    protected abstract E getCollection();
+
+    protected abstract SortOrder sortOrder();
+
+    protected abstract String sortField();
+
+    /**
+     * Should we fetch the source and what fields specifically.
+     *
+     * Defaults to all fields and true.
+     */
+    protected FetchSourceContext getFetchSourceContext() {
+        return FetchSourceContext.FETCH_SOURCE;
+    }
+
+    protected int getBatchSize() {
+        return BATCH_SIZE;
+    }
+}

+ 125 - 0
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/util/TypedChainTaskExecutor.java

@@ -0,0 +1,125 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.dataframe.util;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Predicate;
+
+/**
+ * A utility that allows chained (serial) execution of a number of tasks
+ * in async manner.
+ */
+public class TypedChainTaskExecutor<T> {
+
+    public interface ChainTask <T> {
+        void run(ActionListener<T> listener);
+    }
+
+    private final ExecutorService executorService;
+    private final LinkedList<ChainTask<T>> tasks = new LinkedList<>();
+    private final Predicate<Exception> failureShortCircuitPredicate;
+    private final Predicate<T> continuationPredicate;
+    private final List<T> collectedResponses;
+
+    /**
+     * Creates a new TypedChainTaskExecutor.
+     * Each chainedTask is executed in order serially and after each execution the continuationPredicate is tested.
+     *
+     * On failures the failureShortCircuitPredicate is tested.
+     *
+     * @param executorService The service where to execute the tasks
+     * @param continuationPredicate The predicate to test on whether to execute the next task or not.
+     *                              {@code true} means continue on to the next task.
+     *                              Must be able to handle null values.
+     * @param failureShortCircuitPredicate The predicate on whether to short circuit execution on a give exception.
+     *                                     {@code true} means that no more tasks should execute and the the listener::onFailure should be
+     *                                     called.
+     */
+    public TypedChainTaskExecutor(ExecutorService executorService,
+                                  Predicate<T> continuationPredicate,
+                                  Predicate<Exception> failureShortCircuitPredicate) {
+        this.executorService = Objects.requireNonNull(executorService);
+        this.continuationPredicate = continuationPredicate;
+        this.failureShortCircuitPredicate = failureShortCircuitPredicate;
+        this.collectedResponses = new ArrayList<>();
+    }
+
+    public synchronized void add(ChainTask<T> task) {
+        tasks.add(task);
+    }
+
+    private synchronized void execute(T previousValue, ActionListener<List<T>> listener) {
+        collectedResponses.add(previousValue);
+        if (continuationPredicate.test(previousValue)) {
+            if (tasks.isEmpty()) {
+               listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses)));
+               return;
+            }
+            ChainTask<T> task = tasks.pop();
+            executorService.execute(new AbstractRunnable() {
+                @Override
+                public void onFailure(Exception e) {
+                    if (failureShortCircuitPredicate.test(e)) {
+                        listener.onFailure(e);
+                    } else {
+                        execute(null, listener);
+                    }
+                }
+
+                @Override
+                protected void doRun() {
+                    task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure));
+                }
+            });
+        } else {
+            listener.onResponse(Collections.unmodifiableList(new ArrayList<>(collectedResponses)));
+        }
+    }
+
+    /**
+     * Execute all the chained tasks serially, notify listener when completed
+     *
+     * @param listener The ActionListener to notify when all executions have been completed,
+     *                 or when no further tasks should be executed.
+     *                 The resulting list COULD contain null values depending on if execution is continued
+     *                 on exceptions or not.
+     */
+    public synchronized void execute(ActionListener<List<T>> listener) {
+        if (tasks.isEmpty()) {
+            listener.onResponse(Collections.emptyList());
+            return;
+        }
+        collectedResponses.clear();
+        ChainTask<T> task = tasks.pop();
+        executorService.execute(new AbstractRunnable() {
+            @Override
+            public void onFailure(Exception e) {
+                if (failureShortCircuitPredicate.test(e)) {
+                    listener.onFailure(e);
+                } else {
+                    execute(null, listener);
+                }
+            }
+
+            @Override
+            protected void doRun() {
+                task.run(ActionListener.wrap(value -> execute(value, listener), this::onFailure));
+            }
+        });
+    }
+
+    public synchronized List<T> getCollectedResponses() {
+        return Collections.unmodifiableList(new ArrayList<>(collectedResponses));
+    }
+}

+ 12 - 12
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/DataFrameFeatureSetTests.java

@@ -24,7 +24,6 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformStateAndStatsTests;
-import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.junit.Before;
 
 import java.io.IOException;
@@ -83,17 +82,19 @@ public class DataFrameFeatureSetTests extends ESTestCase {
                     DataFrameTransformConfigTests.randomDataFrameTransformConfig("df-" + Integer.toString(uniqueId++)));
         }
 
-        List<DataFrameTransformConfig> transformConfigWithTasks = new ArrayList<>(transformsStateAndStats.size());
+        List<DataFrameTransformConfig> transformConfigWithTasks =
+            new ArrayList<>(transformsStateAndStats.size() + transformConfigWithoutTasks.size());
+
         transformsStateAndStats.forEach(stats ->
             transformConfigWithTasks.add(DataFrameTransformConfigTests.randomDataFrameTransformConfig(stats.getId())));
-
-        List<DataFrameTransformConfig> allConfigs = new ArrayList<>(transformConfigWithoutTasks.size() + transformConfigWithTasks.size());
-        allConfigs.addAll(transformConfigWithoutTasks);
-        allConfigs.addAll(transformConfigWithTasks);
+        transformConfigWithoutTasks.forEach(withoutTask ->
+            transformsStateAndStats.add(DataFrameTransformStateAndStats.initialStateAndStats(withoutTask.getId())));
 
         boolean enabled = randomBoolean();
         boolean available = randomBoolean();
-        DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available, enabled, allConfigs, transformsStateAndStats);
+        DataFrameFeatureSetUsage usage = DataFrameFeatureSet.createUsage(available,
+            enabled,
+            transformsStateAndStats);
 
         assertEquals(enabled, usage.enabled());
         try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
@@ -108,13 +109,12 @@ public class DataFrameFeatureSetTests extends ESTestCase {
                 assertEquals(null, XContentMapValues.extractValue("transforms", usageAsMap));
                 assertEquals(null, XContentMapValues.extractValue("stats", usageAsMap));
             } else {
-                assertEquals(transformsStateAndStats.size() + transformConfigWithoutTasks.size(),
-                    XContentMapValues.extractValue("transforms._all", usageAsMap));
+                assertEquals(transformsStateAndStats.size(), XContentMapValues.extractValue("transforms._all", usageAsMap));
 
                 Map<String, Integer> stateCounts = new HashMap<>();
-                transformsStateAndStats.stream().map(x -> x.getTransformState().getIndexerState().value())
-                        .forEach(x -> stateCounts.merge(x, 1, Integer::sum));
-                transformConfigWithoutTasks.forEach(ignored -> stateCounts.merge(IndexerState.STOPPED.value(), 1, Integer::sum));
+                transformsStateAndStats.stream()
+                    .map(x -> x.getTransformState().getIndexerState().value())
+                    .forEach(x -> stateCounts.merge(x, 1, Integer::sum));
                 stateCounts.forEach((k, v) -> assertEquals(v, XContentMapValues.extractValue("transforms." + k, usageAsMap)));
 
                 // use default constructed stats object for assertions if transformsStateAndStats is empty

+ 339 - 0
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/util/BatchedDataIteratorTests.java

@@ -0,0 +1,339 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.dataframe.util;
+
+import org.apache.lucene.search.TotalHits;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.ClearScrollRequestBuilder;
+import org.elasticsearch.action.search.ClearScrollResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.document.DocumentField;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.search.sort.SortOrder;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.Before;
+import org.mockito.Mockito;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BatchedDataIteratorTests extends ESTestCase {
+
+    private static final String INDEX_NAME = "some_index_name";
+    private static final String SCROLL_ID = "someScrollId";
+
+    private Client client;
+    private boolean wasScrollCleared;
+
+    private TestIterator testIterator;
+
+    private List<SearchRequest> searchRequestCaptor = new ArrayList<>();
+    private List<SearchScrollRequest> searchScrollRequestCaptor = new ArrayList<>();
+
+    @Before
+    public void setUpMocks() {
+        ThreadPool pool = mock(ThreadPool.class);
+        ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
+        when(pool.getThreadContext()).thenReturn(threadContext);
+        client = Mockito.mock(Client.class);
+        when(client.threadPool()).thenReturn(pool);
+        wasScrollCleared = false;
+        testIterator = new TestIterator(client, INDEX_NAME);
+        givenClearScrollRequest();
+        searchRequestCaptor.clear();
+        searchScrollRequestCaptor.clear();
+    }
+
+    public void testQueryReturnsNoResults() throws Exception {
+        new ScrollResponsesMocker().finishMock();
+
+        assertTrue(testIterator.hasNext());
+        PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
+        testIterator.next(future);
+        assertTrue(future.get().isEmpty());
+        assertFalse(testIterator.hasNext());
+        assertTrue(wasScrollCleared);
+        assertSearchRequest();
+        assertSearchScrollRequests(0);
+    }
+
+    public void testCallingNextWhenHasNextIsFalseThrows() throws Exception {
+        PlainActionFuture<Deque<String>> firstFuture = new PlainActionFuture<>();
+        new ScrollResponsesMocker().addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")).finishMock();
+        testIterator.next(firstFuture);
+        firstFuture.get();
+        assertFalse(testIterator.hasNext());
+        PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
+        ExecutionException executionException = ESTestCase.expectThrows(ExecutionException.class, () -> {
+            testIterator.next(future);
+            future.get();
+        });
+        assertNotNull(executionException.getCause());
+        assertTrue(executionException.getCause() instanceof NoSuchElementException);
+    }
+
+    public void testQueryReturnsSingleBatch() throws Exception {
+        PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
+        new ScrollResponsesMocker().addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c")).finishMock();
+
+        assertTrue(testIterator.hasNext());
+        testIterator.next(future);
+        Deque<String> batch = future.get();
+        assertEquals(3, batch.size());
+        assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c"))));
+        assertFalse(testIterator.hasNext());
+        assertTrue(wasScrollCleared);
+
+        assertSearchRequest();
+        assertSearchScrollRequests(0);
+    }
+
+    public void testQueryReturnsThreeBatches() throws Exception {
+        PlainActionFuture<Deque<String>> future = new PlainActionFuture<>();
+        new ScrollResponsesMocker()
+        .addBatch(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c"))
+        .addBatch(createJsonDoc("d"), createJsonDoc("e"))
+        .addBatch(createJsonDoc("f"))
+        .finishMock();
+
+        assertTrue(testIterator.hasNext());
+
+        testIterator.next(future);
+        Deque<String> batch = future.get();
+        assertEquals(3, batch.size());
+        assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("a"), createJsonDoc("b"), createJsonDoc("c"))));
+
+        future = new PlainActionFuture<>();
+        testIterator.next(future);
+        batch = future.get();
+        assertEquals(2, batch.size());
+        assertTrue(batch.containsAll(Arrays.asList(createJsonDoc("d"), createJsonDoc("e"))));
+
+        future = new PlainActionFuture<>();
+        testIterator.next(future);
+        batch = future.get();
+        assertEquals(1, batch.size());
+        assertTrue(batch.containsAll(Collections.singletonList(createJsonDoc("f"))));
+
+        assertFalse(testIterator.hasNext());
+        assertTrue(wasScrollCleared);
+
+        assertSearchRequest();
+        assertSearchScrollRequests(2);
+    }
+
+    private String createJsonDoc(String value) {
+        return "{\"foo\":\"" + value + "\"}";
+    }
+
+    @SuppressWarnings("unchecked")
+    private void givenClearScrollRequest() {
+        ClearScrollRequestBuilder requestBuilder = mock(ClearScrollRequestBuilder.class);
+
+        when(client.prepareClearScroll()).thenReturn(requestBuilder);
+        when(requestBuilder.setScrollIds(Collections.singletonList(SCROLL_ID))).thenReturn(requestBuilder);
+        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+        clearScrollRequest.addScrollId(SCROLL_ID);
+        when(requestBuilder.request()).thenReturn(clearScrollRequest);
+        doAnswer((answer) -> {
+            wasScrollCleared = true;
+            ActionListener<ClearScrollResponse> scrollListener =
+                (ActionListener<ClearScrollResponse>) answer.getArguments()[1];
+            scrollListener.onResponse(new ClearScrollResponse(true,0));
+            return null;
+        }).when(client).clearScroll(any(ClearScrollRequest.class), any(ActionListener.class));
+    }
+
+    private void assertSearchRequest() {
+        List<SearchRequest> searchRequests = searchRequestCaptor;
+        assertThat(searchRequests.size(), equalTo(1));
+        SearchRequest searchRequest = searchRequests.get(0);
+        assertThat(searchRequest.indices(), equalTo(new String[] {INDEX_NAME}));
+        assertThat(searchRequest.scroll().keepAlive(), equalTo(TimeValue.timeValueMinutes(5)));
+        assertThat(searchRequest.types().length, equalTo(0));
+        assertThat(searchRequest.source().query(), equalTo(QueryBuilders.matchAllQuery()));
+        assertThat(searchRequest.source().trackTotalHitsUpTo(), is(SearchContext.TRACK_TOTAL_HITS_ACCURATE));
+    }
+
+    private void assertSearchScrollRequests(int expectedCount) {
+        List<SearchScrollRequest> searchScrollRequests = searchScrollRequestCaptor;
+        assertThat(searchScrollRequests.size(), equalTo(expectedCount));
+        for (SearchScrollRequest request : searchScrollRequests) {
+            assertThat(request.scrollId(), equalTo(SCROLL_ID));
+            assertThat(request.scroll().keepAlive(), equalTo(TimeValue.timeValueMinutes(5)));
+        }
+    }
+
+    private class ScrollResponsesMocker {
+        private List<String[]> batches = new ArrayList<>();
+        private long totalHits = 0;
+        private List<SearchResponse> responses = new ArrayList<>();
+
+        ScrollResponsesMocker addBatch(String... hits) {
+            totalHits += hits.length;
+            batches.add(hits);
+            return this;
+        }
+
+        @SuppressWarnings("unchecked")
+        void finishMock() {
+            if (batches.isEmpty()) {
+                givenInitialResponse();
+                return;
+            }
+            givenInitialResponse(batches.get(0));
+            for (int i = 1; i < batches.size(); ++i) {
+                givenNextResponse(batches.get(i));
+            }
+            if (responses.size() > 0) {
+                SearchResponse first = responses.get(0);
+                if (responses.size() > 1) {
+                    List<SearchResponse> rest = new ArrayList<>(responses);
+                    Iterator<SearchResponse> responseIterator = rest.iterator();
+                    doAnswer((answer) -> {
+                        SearchScrollRequest request = (SearchScrollRequest)answer.getArguments()[0];
+                        ActionListener<SearchResponse> rsp = (ActionListener<SearchResponse>)answer.getArguments()[1];
+                        searchScrollRequestCaptor.add(request);
+                        rsp.onResponse(responseIterator.next());
+                        return null;
+                    }).when(client).searchScroll(any(SearchScrollRequest.class), any(ActionListener.class));
+                } else {
+                    doAnswer((answer) -> {
+                        SearchScrollRequest request = (SearchScrollRequest)answer.getArguments()[0];
+                        ActionListener<SearchResponse> rsp = (ActionListener<SearchResponse>)answer.getArguments()[1];
+                        searchScrollRequestCaptor.add(request);
+                        rsp.onResponse(first);
+                        return null;
+                    }).when(client).searchScroll(any(SearchScrollRequest.class), any(ActionListener.class));
+                }
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        private void givenInitialResponse(String... hits) {
+            SearchResponse searchResponse = createSearchResponseWithHits(hits);
+            doAnswer((answer) -> {
+                SearchRequest request = (SearchRequest)answer.getArguments()[0];
+                searchRequestCaptor.add(request);
+                ActionListener<SearchResponse> rsp = (ActionListener<SearchResponse>)answer.getArguments()[1];
+                rsp.onResponse(searchResponse);
+                return null;
+            }).when(client).search(any(SearchRequest.class), any(ActionListener.class));
+        }
+
+        private void givenNextResponse(String... hits) {
+            responses.add(createSearchResponseWithHits(hits));
+        }
+
+        private SearchResponse createSearchResponseWithHits(String... hits) {
+            SearchHits searchHits = createHits(hits);
+            SearchResponse searchResponse = mock(SearchResponse.class);
+            when(searchResponse.getScrollId()).thenReturn(SCROLL_ID);
+            when(searchResponse.getHits()).thenReturn(searchHits);
+            return searchResponse;
+        }
+
+        private SearchHits createHits(String... values) {
+            List<SearchHit> hits = new ArrayList<>();
+            for (String value : values) {
+                hits.add(new SearchHitBuilder(randomInt()).setSource(value).build());
+            }
+            return new SearchHits(hits.toArray(new SearchHit[hits.size()]), new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 1.0f);
+        }
+    }
+
+    private static class TestIterator extends BatchedDataIterator<String, Deque<String>> {
+        TestIterator(Client client, String jobId) {
+            super(client, jobId);
+        }
+
+        @Override
+        protected QueryBuilder getQuery() {
+            return QueryBuilders.matchAllQuery();
+        }
+
+        @Override
+        protected String map(SearchHit hit) {
+            return hit.getSourceAsString();
+        }
+
+        @Override
+        protected Deque<String> getCollection() {
+            return new ArrayDeque<>();
+        }
+
+        @Override
+        protected SortOrder sortOrder() {
+            return SortOrder.DESC;
+        }
+
+        @Override
+        protected String sortField() {
+            return "foo";
+        }
+    }
+    public class SearchHitBuilder {
+
+        private final SearchHit hit;
+        private final Map<String, DocumentField> fields;
+
+        public SearchHitBuilder(int docId) {
+            hit = new SearchHit(docId);
+            fields = new HashMap<>();
+        }
+
+        public SearchHitBuilder addField(String name, Object value) {
+            return addField(name, Arrays.asList(value));
+        }
+
+        public SearchHitBuilder addField(String name, List<Object> values) {
+            fields.put(name, new DocumentField(name, values));
+            return this;
+        }
+
+        public SearchHitBuilder setSource(String sourceJson) {
+            hit.sourceRef(new BytesArray(sourceJson));
+            return this;
+        }
+
+        public SearchHit build() {
+            if (!fields.isEmpty()) {
+                hit.fields(fields);
+            }
+            return hit;
+        }
+    }
+}

+ 14 - 2
x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.get_data_frame_transform.json

@@ -4,12 +4,24 @@
     "methods": [ "GET" ],
     "url": {
       "path": "/_data_frame/transforms/{transform_id}",
-      "paths": [ "/_data_frame/transforms/{transform_id}"],
+      "paths": [ "/_data_frame/transforms/{transform_id}", "/_data_frame/transforms"],
       "parts": {
         "transform_id": {
           "type": "string",
           "required": false,
-          "description": "The id of the transforms to get, '_all' or '*' implies get all transforms"
+          "description": "The id or comma delimited list of id expressions of the transforms to get, '_all' or '*' implies get all transforms"
+        }
+      },
+      "params": {
+        "from": {
+          "type": "int",
+          "required": false,
+          "description": "skips a number of transform configs, defaults to 0"
+        },
+        "size": {
+          "type": "int",
+          "required": false,
+          "description": "specifies a max number of transforms to get, defaults to 100"
         }
       }
     },

+ 38 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

@@ -105,3 +105,41 @@ setup:
   - match: { count: 2 }
   - match: { transforms.0.id: "airline-transform" }
   - match: { transforms.1.id: "airline-transform-dos" }
+
+  - do:
+      data_frame.get_data_frame_transform:
+        transform_id: "airline-transform,airline-transform-dos"
+  - match: { count: 2 }
+  - match: { transforms.0.id: "airline-transform" }
+  - match: { transforms.1.id: "airline-transform-dos" }
+
+  - do:
+      data_frame.get_data_frame_transform:
+        transform_id: "airline-transform*"
+  - match: { count: 2 }
+  - match: { transforms.0.id: "airline-transform" }
+  - match: { transforms.1.id: "airline-transform-dos" }
+
+  - do:
+      data_frame.get_data_frame_transform:
+        transform_id: "airline-transform*"
+        from: 0
+        size: 1
+  - match: { count: 1 }
+  - match: { transforms.0.id: "airline-transform" }
+
+  - do:
+      data_frame.get_data_frame_transform:
+        transform_id: "airline-transform*"
+        from: 1
+        size: 1
+  - match: { count: 1 }
+  - match: { transforms.0.id: "airline-transform-dos" }
+---
+"Test transform with invalid page parameter":
+  - do:
+      catch: /Param \[size\] has a max acceptable value of \[1000\]/
+      data_frame.get_data_frame_transform:
+        transform_id: "_all"
+        from: 0
+        size: 10000

+ 65 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_stats.yml

@@ -104,3 +104,68 @@ teardown:
   - do:
       data_frame.delete_data_frame_transform:
         transform_id: "airline-transform-stats-dos"
+
+---
+"Test get multiple transform stats where one does not have a task":
+  - do:
+      data_frame.put_data_frame_transform:
+        transform_id: "airline-transform-stats-dos"
+        body: >
+          {
+            "source": "airline-data",
+            "dest": "airline-data-by-airline-stats-dos",
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            }
+          }
+  - do:
+      data_frame.get_data_frame_transform_stats:
+        transform_id: "*"
+  - match: { count: 2 }
+  - match: { transforms.0.id: "airline-transform-stats" }
+  - match: { transforms.0.state.transform_state: "started" }
+  - match: { transforms.1.id: "airline-transform-stats-dos" }
+  - match: { transforms.1.state.transform_state: "stopped" }
+
+  - do:
+      data_frame.get_data_frame_transform_stats:
+        transform_id: "_all"
+  - match: { count: 2 }
+  - match: { transforms.0.id: "airline-transform-stats" }
+  - match: { transforms.0.state.transform_state: "started" }
+  - match: { transforms.1.id: "airline-transform-stats-dos" }
+  - match: { transforms.1.state.transform_state: "stopped" }
+
+---
+"Test get single transform stats when it does not have a task":
+
+  - do:
+      data_frame.put_data_frame_transform:
+        transform_id: "airline-transform-stats-dos"
+        body: >
+          {
+            "source": "airline-data",
+            "dest": "airline-data-by-airline-stats-dos",
+            "pivot": {
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            }
+          }
+  - do:
+      data_frame.get_data_frame_transform_stats:
+        transform_id: "airline-transform-stats-dos"
+  - match: { count: 1 }
+  - match: { transforms.0.id: "airline-transform-stats-dos" }
+  - match: { transforms.0.state.transform_state: "stopped" }
+  - match: { transforms.0.state.generation: 0 }
+  - match: { transforms.0.stats.pages_processed: 0 }
+  - match: { transforms.0.stats.documents_processed: 0 }
+  - match: { transforms.0.stats.documents_indexed: 0 }
+  - match: { transforms.0.stats.trigger_count: 0 }
+  - match: { transforms.0.stats.index_time_in_ms: 0 }
+  - match: { transforms.0.stats.index_total: 0 }
+  - match: { transforms.0.stats.index_failures: 0 }
+  - match: { transforms.0.stats.search_time_in_ms: 0 }
+  - match: { transforms.0.stats.search_total: 0 }
+  - match: { transforms.0.stats.search_failures: 0 }