Browse Source

[ML] Data Frame HLRC Get Stats API (#40327)

David Kyle 6 years ago
parent
commit
474ef8b4ed
27 changed files with 1310 additions and 247 deletions
  1. 42 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java
  2. 11 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java
  3. 177 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java
  4. 59 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerState.java
  5. 5 6
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/AcknowledgedTasksResponse.java
  6. 67 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsRequest.java
  7. 106 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsResponse.java
  8. 0 1
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StartDataFrameTransformResponse.java
  9. 0 1
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformResponse.java
  10. 59 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStats.java
  11. 112 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java
  12. 92 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java
  13. 7 177
      client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobResponse.java
  14. 10 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java
  15. 52 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java
  16. 1 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java
  17. 49 43
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/AcknowledgedTasksResponseTests.java
  18. 32 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsRequestTests.java
  19. 99 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsResponseTests.java
  20. 62 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStatsTests.java
  21. 56 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java
  22. 74 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java
  23. 79 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  24. 14 13
      client/rest-high-level/src/test/java/org/elasticsearch/client/rollup/GetRollupJobResponseTests.java
  25. 39 0
      docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc
  26. 2 0
      docs/java-rest/high-level/supported-apis.asciidoc
  27. 4 4
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

+ 42 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameClient.java

@@ -22,6 +22,8 @@ package org.elasticsearch.client;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
@@ -82,6 +84,46 @@ public final class DataFrameClient {
                 Collections.emptySet());
     }
 
+    /**
+     * Get the running statistics of a Data Frame Transform
+     * <p>
+     * For additional info
+     * see <a href="https://www.TODO.com">Get Data Frame transform stats documentation</a>
+     *
+     * @param request Specifies the which transforms to get the stats for
+     * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @return The Data Frame Transform stats
+     * @throws IOException when there is a serialization issue sending the request or receiving the response
+     */
+    public GetDataFrameTransformStatsResponse getDataFrameTransformStats(GetDataFrameTransformStatsRequest request, RequestOptions options)
+            throws IOException {
+        return restHighLevelClient.performRequestAndParseEntity(request,
+                DataFrameRequestConverters::getDataFrameTransformStats,
+                options,
+                GetDataFrameTransformStatsResponse::fromXContent,
+                Collections.emptySet());
+    }
+
+    /**
+     * Get the running statistics of a Data Frame Transform asynchronously and notifies listener on completion
+     * <p>
+     * For additional info
+     * see <a href="https://www.TODO.com">Get Data Frame transform stats documentation</a>
+     *
+     * @param request Specifies the which transforms to get the stats for
+     * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
+     * @param listener Listener to be notified upon request completion
+     */
+    public void getDataFrameTransformStatsAsync(GetDataFrameTransformStatsRequest request, RequestOptions options,
+                                           ActionListener<GetDataFrameTransformStatsResponse> listener) {
+        restHighLevelClient.performRequestAsyncAndParseEntity(request,
+                DataFrameRequestConverters::getDataFrameTransformStats,
+                options,
+                GetDataFrameTransformStatsResponse::fromXContent,
+                listener,
+                Collections.emptySet());
+    }
+
     /**
      * Delete a data frame transform
      * <p>

+ 11 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java

@@ -20,9 +20,11 @@
 package org.elasticsearch.client;
 
 import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
@@ -94,4 +96,13 @@ final class DataFrameRequestConverters {
         request.setEntity(createEntity(previewRequest, REQUEST_BODY_CONTENT_TYPE));
         return request;
     }
+
+    static Request getDataFrameTransformStats(GetDataFrameTransformStatsRequest statsRequest) {
+        String endpoint = new RequestConverters.EndpointBuilder()
+                .addPathPartAsIs("_data_frame", "transforms")
+                .addPathPart(statsRequest.getId())
+                .addPathPartAsIs("_stats")
+                .build();
+        return new Request(HttpGet.METHOD_NAME, endpoint);
+    }
 }

+ 177 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerJobStats.java

@@ -0,0 +1,177 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.core;
+
+import org.elasticsearch.common.ParseField;
+
+import java.util.Objects;
+
+public abstract class IndexerJobStats {
+    public static final String NAME = "data_frame_indexer_transform_stats";
+    public static ParseField NUM_PAGES = new ParseField("pages_processed");
+    public static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
+    public static ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("documents_indexed");
+    public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
+    public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
+    public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
+    public static ParseField INDEX_TOTAL = new ParseField("index_total");
+    public static ParseField SEARCH_TOTAL = new ParseField("search_total");
+    public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
+    public static ParseField INDEX_FAILURES = new ParseField("index_failures");
+
+    private final long numPages;
+    private final long numInputDocuments;
+    private final long numOuputDocuments;
+    private final long numInvocations;
+    private final long indexTime;
+    private final long indexTotal;
+    private final long searchTime;
+    private final long searchTotal;
+    private final long indexFailures;
+    private final long searchFailures;
+
+    public IndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
+                           long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
+        this.numPages = numPages;
+        this.numInputDocuments = numInputDocuments;
+        this.numOuputDocuments = numOuputDocuments;
+        this.numInvocations = numInvocations;
+        this.indexTime = indexTime;
+        this.indexTotal = indexTotal;
+        this.searchTime = searchTime;
+        this.searchTotal = searchTotal;
+        this.indexFailures = indexFailures;
+        this.searchFailures = searchFailures;
+    }
+
+    /**
+     * The number of pages read from the input indices
+     */
+    public long getNumPages() {
+        return numPages;
+    }
+
+    /**
+     * The number of documents read from the input indices
+     */
+    public long getNumDocuments() {
+        return numInputDocuments;
+    }
+
+    /**
+     * Number of times that the job woke up to write documents
+     */
+    public long getNumInvocations() {
+        return numInvocations;
+    }
+
+    /**
+     * Number of documents written
+     */
+    public long getOutputDocuments() {
+        return numOuputDocuments;
+    }
+
+    /**
+     * Number of index failures that have occurred
+     */
+    public long getIndexFailures() {
+        return indexFailures;
+    }
+
+    /**
+     * Number of failures that have occurred
+     */
+    public long getSearchFailures() {
+        return searchFailures;
+    }
+
+    /**
+     * Returns the time spent indexing (cumulative) in milliseconds
+     */
+    public long getIndexTime() {
+        return indexTime;
+    }
+
+    /**
+     * Returns the time spent searching (cumulative) in milliseconds
+     */
+    public long getSearchTime() {
+        return searchTime;
+    }
+
+    /**
+     * Returns the total number of indexing requests that have been processed
+     * (Note: this is not the number of _documents_ that have been indexed)
+     */
+    public long getIndexTotal() {
+        return indexTotal;
+    }
+
+    /**
+     * Returns the total number of search requests that have been made
+     */
+    public long getSearchTotal() {
+        return searchTotal;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other instanceof IndexerJobStats == false) {
+            return false;
+        }
+
+        IndexerJobStats that = (IndexerJobStats) other;
+        return Objects.equals(this.numPages, that.numPages)
+                && Objects.equals(this.numInputDocuments, that.numInputDocuments)
+                && Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
+                && Objects.equals(this.numInvocations, that.numInvocations)
+                && Objects.equals(this.indexTime, that.indexTime)
+                && Objects.equals(this.searchTime, that.searchTime)
+                && Objects.equals(this.indexFailures, that.indexFailures)
+                && Objects.equals(this.searchFailures, that.searchFailures)
+                && Objects.equals(this.searchTotal, that.searchTotal)
+                && Objects.equals(this.indexTotal, that.indexTotal);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
+                indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
+    }
+
+    @Override
+    public final String toString() {
+        return "{pages=" + numPages
+                + ", input_docs=" + numInputDocuments
+                + ", output_docs=" + numOuputDocuments
+                + ", invocations=" + numInvocations
+                + ", index_failures=" + indexFailures
+                + ", search_failures=" + searchFailures
+                + ", index_time_in_ms=" + indexTime
+                + ", index_total=" + indexTotal
+                + ", search_time_in_ms=" + searchTime
+                + ", search_total=" + searchTotal+ "}";
+    }
+}

+ 59 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/core/IndexerState.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.core;
+
+
+import java.util.Locale;
+
+/**
+ * IndexerState represents the internal state of the indexer.  It
+ * is also persistent when changing from started/stopped in case the allocated
+ * task is restarted elsewhere.
+ */
+public enum IndexerState {
+    /** Indexer is running, but not actively indexing data (e.g. it's idle). */
+    STARTED,
+
+    /** Indexer is actively indexing data. */
+    INDEXING,
+
+    /**
+     * Transition state to where an indexer has acknowledged the stop
+     * but is still in process of halting.
+     */
+    STOPPING,
+
+    /** Indexer is "paused" and ignoring scheduled triggers. */
+    STOPPED,
+
+    /**
+     * Something (internal or external) has requested the indexer abort
+     * and shutdown.
+     */
+    ABORTING;
+
+    public static IndexerState fromString(String name) {
+        return valueOf(name.trim().toUpperCase(Locale.ROOT));
+    }
+
+    public String value() {
+        return name().toLowerCase(Locale.ROOT);
+    }
+}

+ 5 - 6
client/rest-high-level/src/main/java/org/elasticsearch/client/core/AcknowledgedTasksResponse.java → client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/AcknowledgedTasksResponse.java

@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.elasticsearch.client.core;
+package org.elasticsearch.client.dataframe;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.TaskOperationFailure;
@@ -26,7 +26,6 @@ import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -36,8 +35,8 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optiona
 
 public class AcknowledgedTasksResponse {
 
-    protected static final ParseField TASK_FAILURES = new ParseField("task_failures");
-    protected static final ParseField NODE_FAILURES = new ParseField("node_failures");
+    public static final ParseField TASK_FAILURES = new ParseField("task_failures");
+    public static final ParseField NODE_FAILURES = new ParseField("node_failures");
 
     @SuppressWarnings("unchecked")
     protected static <T extends AcknowledgedTasksResponse> ConstructingObjectParser<T, Void> generateParser(
@@ -60,8 +59,8 @@ public class AcknowledgedTasksResponse {
     public AcknowledgedTasksResponse(boolean acknowledged, @Nullable List<TaskOperationFailure> taskFailures,
                                      @Nullable List<? extends ElasticsearchException> nodeFailures) {
         this.acknowledged = acknowledged;
-        this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(taskFailures));
-        this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
+        this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(taskFailures);
+        this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(nodeFailures);
     }
 
     public boolean isAcknowledged() {

+ 67 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsRequest.java

@@ -0,0 +1,67 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.client.Validatable;
+import org.elasticsearch.client.ValidationException;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class GetDataFrameTransformStatsRequest implements Validatable {
+    private final String id;
+
+    public GetDataFrameTransformStatsRequest(String id) {
+        this.id = id;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public Optional<ValidationException> validate() {
+        if (id == null) {
+            ValidationException validationException = new ValidationException();
+            validationException.addValidationError("data frame transform id must not be null");
+            return Optional.of(validationException);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        GetDataFrameTransformStatsRequest other = (GetDataFrameTransformStatsRequest) obj;
+        return Objects.equals(id, other.id);
+    }
+}

+ 106 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsResponse.java

@@ -0,0 +1,106 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+public class GetDataFrameTransformStatsResponse {
+
+    public static final ParseField TRANSFORMS = new ParseField("transforms");
+    public static final ParseField COUNT = new ParseField("count");
+
+    @SuppressWarnings("unchecked")
+    static final ConstructingObjectParser<GetDataFrameTransformStatsResponse, Void> PARSER = new ConstructingObjectParser<>(
+            "get_data_frame_transform_stats_response", true,
+            args -> new GetDataFrameTransformStatsResponse((List<DataFrameTransformStateAndStats>) args[0],
+                    (List<TaskOperationFailure>) args[1], (List<ElasticsearchException>) args[2]));
+
+    static {
+        PARSER.declareObjectArray(constructorArg(), DataFrameTransformStateAndStats.PARSER::apply, TRANSFORMS);
+        // Discard the count field which is the size of the transforms array
+        PARSER.declareInt((a, b) -> {}, COUNT);
+        PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p),
+                AcknowledgedTasksResponse.TASK_FAILURES);
+        PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p),
+                AcknowledgedTasksResponse.NODE_FAILURES);
+    }
+
+    public static GetDataFrameTransformStatsResponse fromXContent(final XContentParser parser) {
+        return GetDataFrameTransformStatsResponse.PARSER.apply(parser, null);
+    }
+
+    private final List<DataFrameTransformStateAndStats> transformsStateAndStats;
+    private final List<TaskOperationFailure> taskFailures;
+    private final List<ElasticsearchException> nodeFailures;
+
+    public GetDataFrameTransformStatsResponse(List<DataFrameTransformStateAndStats> transformsStateAndStats,
+                                              @Nullable List<TaskOperationFailure> taskFailures,
+                                              @Nullable List<? extends ElasticsearchException> nodeFailures) {
+        this.transformsStateAndStats = transformsStateAndStats;
+        this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(taskFailures);
+        this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(nodeFailures);
+    }
+
+    public List<DataFrameTransformStateAndStats> getTransformsStateAndStats() {
+        return transformsStateAndStats;
+    }
+
+    public List<ElasticsearchException> getNodeFailures() {
+        return nodeFailures;
+    }
+
+    public List<TaskOperationFailure> getTaskFailures() {
+        return taskFailures;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transformsStateAndStats, nodeFailures, taskFailures);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        final GetDataFrameTransformStatsResponse that = (GetDataFrameTransformStatsResponse) other;
+        return Objects.equals(this.transformsStateAndStats, that.transformsStateAndStats)
+                && Objects.equals(this.nodeFailures, that.nodeFailures)
+                && Objects.equals(this.taskFailures, that.taskFailures);
+    }
+}

+ 0 - 1
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StartDataFrameTransformResponse.java

@@ -21,7 +21,6 @@ package org.elasticsearch.client.dataframe;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.TaskOperationFailure;
-import org.elasticsearch.client.core.AcknowledgedTasksResponse;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.XContentParser;

+ 0 - 1
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/StopDataFrameTransformResponse.java

@@ -21,7 +21,6 @@ package org.elasticsearch.client.dataframe;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.TaskOperationFailure;
-import org.elasticsearch.client.core.AcknowledgedTasksResponse;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
 import org.elasticsearch.common.xcontent.XContentParser;

+ 59 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStats.java

@@ -0,0 +1,59 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.client.core.IndexerJobStats;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+
+public class DataFrameIndexerTransformStats extends IndexerJobStats {
+
+    public static final ConstructingObjectParser<DataFrameIndexerTransformStats, Void> PARSER = new ConstructingObjectParser<>(
+            NAME, true, args -> new DataFrameIndexerTransformStats((long) args[0], (long) args[1], (long) args[2],
+            (long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
+
+    static {
+        PARSER.declareLong(constructorArg(), NUM_PAGES);
+        PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
+        PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
+        PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
+        PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
+        PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
+        PARSER.declareLong(constructorArg(), INDEX_TOTAL);
+        PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
+        PARSER.declareLong(constructorArg(), INDEX_FAILURES);
+        PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
+    }
+
+    public static DataFrameIndexerTransformStats fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    public DataFrameIndexerTransformStats(long numPages, long numInputDocuments, long numOuputDocuments,
+                                          long numInvocations, long indexTime, long searchTime,
+                                          long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
+        super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
+                indexTotal, searchTotal, indexFailures, searchFailures);
+    }
+}

+ 112 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformState.java

@@ -0,0 +1,112 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.client.core.IndexerState;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
+import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
+
+public class DataFrameTransformState {
+
+    private static final ParseField STATE = new ParseField("transform_state");
+    private static final ParseField CURRENT_POSITION = new ParseField("current_position");
+    private static final ParseField GENERATION = new ParseField("generation");
+
+    @SuppressWarnings("unchecked")
+    public static final ConstructingObjectParser<DataFrameTransformState, Void> PARSER =
+            new ConstructingObjectParser<>("data_frame_transform_state",
+                    args -> new DataFrameTransformState((IndexerState) args[0], (HashMap<String, Object>) args[1], (long) args[2]));
+
+    static {
+        PARSER.declareField(constructorArg(), p -> IndexerState.fromString(p.text()), STATE, ObjectParser.ValueType.STRING);
+        PARSER.declareField(optionalConstructorArg(), p -> {
+            if (p.currentToken() == XContentParser.Token.START_OBJECT) {
+                return p.map();
+            }
+            if (p.currentToken() == XContentParser.Token.VALUE_NULL) {
+                return null;
+            }
+            throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
+        }, CURRENT_POSITION, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
+        PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), GENERATION);
+    }
+
+    public static DataFrameTransformState fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    private final IndexerState state;
+    private final long generation;
+    private final SortedMap<String, Object> currentPosition;
+
+    public DataFrameTransformState(IndexerState state, @Nullable Map<String, Object> position, long generation) {
+        this.state = state;
+        this.currentPosition = position == null ? null : Collections.unmodifiableSortedMap(new TreeMap<>(position));
+        this.generation = generation;
+    }
+
+    public IndexerState getIndexerState() {
+        return state;
+    }
+
+    @Nullable
+    public Map<String, Object> getPosition() {
+        return currentPosition;
+    }
+
+    public long getGeneration() {
+        return generation;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameTransformState that = (DataFrameTransformState) other;
+
+        return Objects.equals(this.state, that.state) && Objects.equals(this.currentPosition, that.currentPosition)
+                && this.generation == that.generation;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(state, currentPosition, generation);
+    }
+}

+ 92 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStats.java

@@ -0,0 +1,92 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public class DataFrameTransformStateAndStats {
+
+    public static final ParseField ID = new ParseField("id");
+    public static final ParseField STATE_FIELD = new ParseField("state");
+    public static final ParseField STATS_FIELD = new ParseField("stats");
+
+    public static final ConstructingObjectParser<DataFrameTransformStateAndStats, Void> PARSER = new ConstructingObjectParser<>(
+            "data_frame_transform_state_and_stats", true,
+            a -> new DataFrameTransformStateAndStats((String) a[0], (DataFrameTransformState) a[1], (DataFrameIndexerTransformStats) a[2]));
+
+    static {
+        PARSER.declareString(ConstructingObjectParser.constructorArg(), ID);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), DataFrameTransformState.PARSER::apply, STATE_FIELD);
+        PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> DataFrameIndexerTransformStats.fromXContent(p),
+                STATS_FIELD);
+    }
+
+    public static DataFrameTransformStateAndStats fromXContent(XContentParser parser) throws IOException {
+        return PARSER.parse(parser, null);
+    }
+
+    private final String id;
+    private final DataFrameTransformState transformState;
+    private final DataFrameIndexerTransformStats transformStats;
+
+    public DataFrameTransformStateAndStats(String id, DataFrameTransformState state, DataFrameIndexerTransformStats stats) {
+        this.id = id;
+        this.transformState = state;
+        this.transformStats = stats;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public DataFrameIndexerTransformStats getTransformStats() {
+        return transformStats;
+    }
+
+    public DataFrameTransformState getTransformState() {
+        return transformState;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, transformState, transformStats);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        DataFrameTransformStateAndStats that = (DataFrameTransformStateAndStats) other;
+
+        return Objects.equals(this.id, that.id) && Objects.equals(this.transformState, that.transformState)
+                && Objects.equals(this.transformStats, that.transformStats);
+    }
+}

+ 7 - 177
client/rest-high-level/src/main/java/org/elasticsearch/client/rollup/GetRollupJobResponse.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.client.rollup;
 
+import org.elasticsearch.client.core.IndexerJobStats;
+import org.elasticsearch.client.core.IndexerState;
 import org.elasticsearch.client.rollup.job.config.RollupJobConfig;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@@ -27,7 +29,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 
@@ -44,19 +45,10 @@ public class GetRollupJobResponse {
     static final ParseField CONFIG = new ParseField("config");
     static final ParseField STATS = new ParseField("stats");
     static final ParseField STATUS = new ParseField("status");
-    static final ParseField NUM_PAGES = new ParseField("pages_processed");
-    static final ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed");
-    static final ParseField NUM_OUTPUT_DOCUMENTS = new ParseField("rollups_indexed");
-    static final ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
     static final ParseField STATE = new ParseField("job_state");
     static final ParseField CURRENT_POSITION = new ParseField("current_position");
+    static final ParseField ROLLUPS_INDEXED = new ParseField("rollups_indexed");
     static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
-    static final ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
-    static final ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
-    static final ParseField INDEX_TOTAL = new ParseField("index_total");
-    static final ParseField SEARCH_TOTAL = new ParseField("search_total");
-    static final ParseField SEARCH_FAILURES = new ParseField("search_failures");
-    static final ParseField INDEX_FAILURES = new ParseField("index_failures");
 
     private List<JobWrapper> jobs;
 
@@ -182,101 +174,12 @@ public class GetRollupJobResponse {
      * The Rollup specialization of stats for the AsyncTwoPhaseIndexer.
      * Note: instead of `documents_indexed`, this XContent show `rollups_indexed`
      */
-    public static class RollupIndexerJobStats {
-        private final long numPages;
-        private final long numInputDocuments;
-        private final long numOuputDocuments;
-        private final long numInvocations;
-        private long indexTime;
-        private long indexTotal;
-        private long searchTime;
-        private long searchTotal;
-        private long indexFailures;
-        private long searchFailures;
+    public static class RollupIndexerJobStats extends IndexerJobStats {
 
         RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
                               long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
-            this.numPages = numPages;
-            this.numInputDocuments = numInputDocuments;
-            this.numOuputDocuments = numOuputDocuments;
-            this.numInvocations = numInvocations;
-            this.indexTime = indexTime;
-            this.indexTotal = indexTotal;
-            this.searchTime = searchTime;
-            this.searchTotal = searchTotal;
-            this.indexFailures = indexFailures;
-            this.searchFailures = searchFailures;
-        }
-
-        /**
-         * The number of pages read from the input indices.
-         */
-        public long getNumPages() {
-            return numPages;
-        }
-
-        /**
-         * The number of documents read from the input indices.
-         */
-        public long getNumDocuments() {
-            return numInputDocuments;
-        }
-
-        /**
-         * Number of times that the job woke up to write documents.
-         */
-        public long getNumInvocations() {
-            return numInvocations;
-        }
-
-        /**
-         * Number of documents written to the result indices.
-         */
-        public long getOutputDocuments() {
-            return numOuputDocuments;
-        }
-
-        /**
-         * Number of failures that have occurred during the bulk indexing phase of Rollup
-         */
-        public long getIndexFailures() {
-            return indexFailures;
-        }
-
-        /**
-         * Number of failures that have occurred during the search phase of Rollup
-         */
-        public long getSearchFailures() {
-            return searchFailures;
-        }
-
-        /**
-         * Returns the time spent indexing (cumulative) in milliseconds
-         */
-        public long getIndexTime() {
-            return indexTime;
-        }
-
-        /**
-         * Returns the time spent searching (cumulative) in milliseconds
-         */
-        public long getSearchTime() {
-            return searchTime;
-        }
-
-        /**
-         * Returns the total number of indexing requests that have been sent by the rollup job
-         * (Note: this is not the number of _documents_ that have been indexed)
-         */
-        public long getIndexTotal() {
-            return indexTotal;
-        }
-
-        /**
-         * Returns the total number of search requests that have been sent by the rollup job
-         */
-        public long getSearchTotal() {
-            return searchTotal;
+            super(numPages, numInputDocuments, numOuputDocuments, numInvocations,
+                    indexTime, searchTime, indexTotal, searchTotal, indexFailures, searchFailures);
         }
 
         private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
@@ -287,7 +190,7 @@ public class GetRollupJobResponse {
         static {
             PARSER.declareLong(constructorArg(), NUM_PAGES);
             PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
-            PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
+            PARSER.declareLong(constructorArg(), ROLLUPS_INDEXED);
             PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
             PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
             PARSER.declareLong(constructorArg(), INDEX_TOTAL);
@@ -296,43 +199,6 @@ public class GetRollupJobResponse {
             PARSER.declareLong(constructorArg(), INDEX_FAILURES);
             PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
         }
-
-        @Override
-        public boolean equals(Object other) {
-            if (this == other) return true;
-            if (other == null || getClass() != other.getClass()) return false;
-            RollupIndexerJobStats that = (RollupIndexerJobStats) other;
-            return Objects.equals(this.numPages, that.numPages)
-                && Objects.equals(this.numInputDocuments, that.numInputDocuments)
-                && Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
-                && Objects.equals(this.numInvocations, that.numInvocations)
-                && Objects.equals(this.indexTime, that.indexTime)
-                && Objects.equals(this.searchTime, that.searchTime)
-                && Objects.equals(this.indexFailures, that.indexFailures)
-                && Objects.equals(this.searchFailures, that.searchFailures)
-                && Objects.equals(this.searchTotal, that.searchTotal)
-                && Objects.equals(this.indexTotal, that.indexTotal);
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
-                indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
-        }
-
-        @Override
-        public final String toString() {
-            return "{pages=" + numPages
-                    + ", input_docs=" + numInputDocuments
-                    + ", output_docs=" + numOuputDocuments
-                    + ", invocations=" + numInvocations
-                    + ", index_failures=" + indexFailures
-                    + ", search_failures=" + searchFailures
-                    + ", index_time_in_ms=" + indexTime
-                    + ", index_total=" + indexTotal
-                    + ", search_time_in_ms=" + searchTime
-                    + ", search_total=" + searchTotal+ "}";
-        }
     }
 
     /**
@@ -417,40 +283,4 @@ public class GetRollupJobResponse {
                     + ", upgradedDocumentId=" + upgradedDocumentId + "}";
         }
     }
-
-    /**
-     * IndexerState represents the internal state of the indexer.  It
-     * is also persistent when changing from started/stopped in case the allocated
-     * task is restarted elsewhere.
-     */
-    public enum IndexerState {
-        /** Indexer is running, but not actively indexing data (e.g. it's idle). */
-        STARTED,
-
-        /** Indexer is actively indexing data. */
-        INDEXING,
-
-        /**
-         * Transition state to where an indexer has acknowledged the stop
-         * but is still in process of halting.
-         */
-        STOPPING,
-
-        /** Indexer is "paused" and ignoring scheduled triggers. */
-        STOPPED,
-
-        /**
-         * Something (internal or external) has requested the indexer abort
-         * and shutdown.
-         */
-        ABORTING;
-
-        static IndexerState fromString(String name) {
-            return valueOf(name.trim().toUpperCase(Locale.ROOT));
-        }
-
-        String value() {
-            return name().toLowerCase(Locale.ROOT);
-        }
-    }
 }

+ 10 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java

@@ -20,9 +20,11 @@
 package org.elasticsearch.client;
 
 import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpPut;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
@@ -137,4 +139,12 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
             assertThat(parsedConfig, equalTo(previewRequest.getConfig()));
         }
     }
+
+    public void testGetDataFrameTransformStats() {
+        GetDataFrameTransformStatsRequest getStatsRequest = new GetDataFrameTransformStatsRequest("foo");
+        Request request = DataFrameRequestConverters.getDataFrameTransformStats(getStatsRequest);
+
+        assertEquals(HttpGet.METHOD_NAME, request.getMethod());
+        assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/foo/_stats"));
+    }
 }

+ 52 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java

@@ -25,7 +25,10 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.core.AcknowledgedResponse;
+import org.elasticsearch.client.core.IndexerState;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
@@ -33,7 +36,9 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
+import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
 import org.elasticsearch.client.dataframe.transforms.DestConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
 import org.elasticsearch.client.dataframe.transforms.SourceConfig;
@@ -201,7 +206,10 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         assertThat(startResponse.getNodeFailures(), empty());
         assertThat(startResponse.getTaskFailures(), empty());
 
-        // TODO once get df stats is implemented assert the df has started
+        GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
+                client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
+        assertThat(statsResponse.getTransformsStateAndStats(), hasSize(1));
+        assertEquals(IndexerState.STARTED, statsResponse.getTransformsStateAndStats().get(0).getTransformState().getIndexerState());
 
         StopDataFrameTransformRequest stopRequest = new StopDataFrameTransformRequest(id);
         StopDataFrameTransformResponse stopResponse =
@@ -241,5 +249,48 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
         assertTrue(michel.isPresent());
         assertEquals(3.6d, (double)michel.get().get("avg_rating"), 0.1d);
     }
+
+    public void testGetStats() throws Exception {
+        String sourceIndex = "transform-source";
+        createIndex(sourceIndex);
+        indexData(sourceIndex);
+
+        QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
+        GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
+        AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
+        aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
+        AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
+        PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
+
+        String id = "test-get-stats";
+        DataFrameTransformConfig transform = new DataFrameTransformConfig(id,
+                new SourceConfig(new String[]{sourceIndex}, queryConfig), new DestConfig("pivot-dest"), pivotConfig);
+
+        DataFrameClient client = highLevelClient().dataFrame();
+        AcknowledgedResponse ack = execute(new PutDataFrameTransformRequest(transform), client::putDataFrameTransform,
+                client::putDataFrameTransformAsync);
+        assertTrue(ack.isAcknowledged());
+        transformsToClean.add(id);
+
+        GetDataFrameTransformStatsResponse statsResponse = execute(new GetDataFrameTransformStatsRequest(id),
+                client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
+
+        assertEquals(1, statsResponse.getTransformsStateAndStats().size());
+        DataFrameTransformStateAndStats stats = statsResponse.getTransformsStateAndStats().get(0);
+        assertEquals(IndexerState.STOPPED, stats.getTransformState().getIndexerState());
+
+        DataFrameIndexerTransformStats zeroIndexerStats = new DataFrameIndexerTransformStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+        assertEquals(zeroIndexerStats, stats.getTransformStats());
+
+        // start the transform
+        execute(new StartDataFrameTransformRequest(id), client::startDataFrameTransform, client::startDataFrameTransformAsync);
+        assertBusy(() -> {
+            GetDataFrameTransformStatsResponse response = execute(new GetDataFrameTransformStatsRequest(id),
+                    client::getDataFrameTransformStats, client::getDataFrameTransformStatsAsync);
+            DataFrameTransformStateAndStats stateAndStats = response.getTransformsStateAndStats().get(0);
+            assertEquals(IndexerState.STARTED, stateAndStats.getTransformState().getIndexerState());
+            assertNotEquals(zeroIndexerStats, stateAndStats.getTransformStats());
+        });
+    }
 }
 

+ 1 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/RollupIT.java

@@ -29,6 +29,7 @@ import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.client.core.IndexerState;
 import org.elasticsearch.client.core.AcknowledgedResponse;
 import org.elasticsearch.client.rollup.DeleteRollupJobRequest;
 import org.elasticsearch.client.rollup.GetRollupCapsRequest;
@@ -37,7 +38,6 @@ import org.elasticsearch.client.rollup.GetRollupIndexCapsRequest;
 import org.elasticsearch.client.rollup.GetRollupIndexCapsResponse;
 import org.elasticsearch.client.rollup.GetRollupJobRequest;
 import org.elasticsearch.client.rollup.GetRollupJobResponse;
-import org.elasticsearch.client.rollup.GetRollupJobResponse.IndexerState;
 import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
 import org.elasticsearch.client.rollup.PutRollupJobRequest;
 import org.elasticsearch.client.rollup.StartRollupJobRequest;

+ 49 - 43
client/rest-high-level/src/test/java/org/elasticsearch/client/core/AcknowledgedTasksResponseTests.java → client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/AcknowledgedTasksResponseTests.java

@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.elasticsearch.client.core;
+package org.elasticsearch.client.dataframe;
 
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.TaskOperationFailure;
@@ -42,7 +42,7 @@ public class AcknowledgedTasksResponseTests extends ESTestCase {
                 this::createTestInstance,
                 AcknowledgedTasksResponseTests::toXContent,
                 AcknowledgedTasksResponseTests::fromXContent)
-                .assertEqualsConsumer(this::assertEqualInstances)
+                .assertEqualsConsumer(AcknowledgedTasksResponseTests::assertEqualInstances)
                 .assertToXContentEquivalence(false)
                 .supportsUnknownFields(false)
                 .test();
@@ -50,32 +50,15 @@ public class AcknowledgedTasksResponseTests extends ESTestCase {
 
     // Serialisation of TaskOperationFailure and ElasticsearchException changes
     // the object so use a custom compare method rather than Object.equals
-    private void assertEqualInstances(AcknowledgedTasksResponse expected, AcknowledgedTasksResponse actual) {
+    private static void assertEqualInstances(AcknowledgedTasksResponse expected, AcknowledgedTasksResponse actual) {
         assertNotSame(expected, actual);
         assertEquals(expected.isAcknowledged(), actual.isAcknowledged());
 
-        List<TaskOperationFailure> expectedTaskFailures = expected.getTaskFailures();
-        List<TaskOperationFailure> actualTaskFailures = actual.getTaskFailures();
-
-        assertListEquals(expectedTaskFailures, actualTaskFailures, (a, b) ->
-                Objects.equals(a.getNodeId(), b.getNodeId())
-                        && Objects.equals(a.getTaskId(), b.getTaskId())
-                        && Objects.equals(a.getStatus(), b.getStatus())
-        );
-
-        List<ElasticsearchException> expectedExceptions = expected.getNodeFailures();
-        List<ElasticsearchException> actualExceptions = actual.getNodeFailures();
-
-        // actualException is a wrapped copy of expectedException so the
-        // error messages won't be the same but actualException should contain
-        // the error message from expectedException
-        assertListEquals(expectedExceptions, actualExceptions, (expectedException, actualException) -> {
-            assertThat(actualException.getDetailedMessage(), containsString(expectedException.getMessage()));
-            return true;
-        });
+        assertTaskOperationFailuresEqual(expected.getTaskFailures(), actual.getTaskFailures());
+        assertNodeFailuresEqual(expected.getNodeFailures(), actual.getNodeFailures());
     }
 
-    private <T> void assertListEquals(List<T> expected, List<T> actual, BiPredicate<T, T> comparator) {
+    private static <T> void assertListEquals(List<T> expected, List<T> actual, BiPredicate<T, T> comparator) {
         if (expected == null) {
             assertNull(actual);
             return;
@@ -89,6 +72,26 @@ public class AcknowledgedTasksResponseTests extends ESTestCase {
         }
     }
 
+    public static void assertTaskOperationFailuresEqual(List<TaskOperationFailure> expected,
+                                                        List<TaskOperationFailure> actual) {
+        assertListEquals(expected, actual, (a, b) ->
+                Objects.equals(a.getNodeId(), b.getNodeId())
+                        && Objects.equals(a.getTaskId(), b.getTaskId())
+                        && Objects.equals(a.getStatus(), b.getStatus())
+        );
+    }
+
+    public static void assertNodeFailuresEqual(List<ElasticsearchException> expected,
+                                               List<ElasticsearchException> actual) {
+        // actualException is a wrapped copy of expectedException so the
+        // error messages won't be the same but actualException should contain
+        // the error message from expectedException
+        assertListEquals(expected, actual, (expectedException, actualException) -> {
+            assertThat(actualException.getDetailedMessage(), containsString(expectedException.getMessage()));
+            return true;
+        });
+    }
+
     private static AcknowledgedTasksResponse fromXContent(XContentParser parser) {
         return AcknowledgedTasksResponse.generateParser("ack_tasks_response",
                 AcknowledgedTasksResponse::new, "acknowleged")
@@ -120,32 +123,35 @@ public class AcknowledgedTasksResponseTests extends ESTestCase {
         builder.startObject();
         {
             builder.field("acknowleged", response.isAcknowledged());
+            taskFailuresToXContent(response.getTaskFailures(), builder);
+            nodeFailuresToXContent(response.getNodeFailures(), builder);
+        }
+        builder.endObject();
+    }
 
-            List<TaskOperationFailure> taskFailures = response.getTaskFailures();
-            if (taskFailures != null && taskFailures.isEmpty() == false) {
-                builder.startArray(AcknowledgedTasksResponse.TASK_FAILURES.getPreferredName());
-                for (TaskOperationFailure failure : taskFailures) {
-                    builder.startObject();
-                    failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
-                    builder.endObject();
-                }
-                builder.endArray();
+    public static void taskFailuresToXContent(List<TaskOperationFailure> taskFailures, XContentBuilder builder) throws IOException {
+        if (taskFailures != null && taskFailures.isEmpty() == false) {
+            builder.startArray(AcknowledgedTasksResponse.TASK_FAILURES.getPreferredName());
+            for (TaskOperationFailure failure : taskFailures) {
+                builder.startObject();
+                failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
+                builder.endObject();
             }
+            builder.endArray();
+        }
+    }
 
-            List<ElasticsearchException> nodeFailures = response.getNodeFailures();
-            if (nodeFailures != null && nodeFailures.isEmpty() == false) {
-                builder.startArray(AcknowledgedTasksResponse.NODE_FAILURES.getPreferredName());
-                for (ElasticsearchException failure : nodeFailures) {
-                    builder.startObject();
-                    failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
-                    builder.endObject();
-                }
-                builder.endArray();
+    public static void nodeFailuresToXContent(List<ElasticsearchException> nodeFailures, XContentBuilder builder) throws IOException {
+        if (nodeFailures != null && nodeFailures.isEmpty() == false) {
+            builder.startArray(AcknowledgedTasksResponse.NODE_FAILURES.getPreferredName());
+            for (ElasticsearchException failure : nodeFailures) {
+                builder.startObject();
+                failure.toXContent(builder, ToXContent.EMPTY_PARAMS);
+                builder.endObject();
             }
+            builder.endArray();
         }
-        builder.endObject();
     }
-
 }
 
 

+ 32 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsRequestTests.java

@@ -0,0 +1,32 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.test.ESTestCase;
+
+import static org.hamcrest.Matchers.containsString;
+
+public class GetDataFrameTransformStatsRequestTests extends ESTestCase {
+    public void testValidate() {
+        assertFalse(new GetDataFrameTransformStatsRequest("valid-id").validate().isPresent());
+        assertThat(new GetDataFrameTransformStatsRequest(null).validate().get().getMessage(),
+                containsString("data frame transform id must not be null"));
+    }
+}

+ 99 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/GetDataFrameTransformStatsResponseTests.java

@@ -0,0 +1,99 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.TaskOperationFailure;
+import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
+import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStatsTests;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class GetDataFrameTransformStatsResponseTests extends ESTestCase {
+
+    public void testXContentParser() throws IOException {
+        xContentTester(this::createParser,
+                GetDataFrameTransformStatsResponseTests::createTestInstance,
+                GetDataFrameTransformStatsResponseTests::toXContent,
+                GetDataFrameTransformStatsResponse::fromXContent)
+                .assertEqualsConsumer(GetDataFrameTransformStatsResponseTests::assertEqualInstances)
+                .assertToXContentEquivalence(false)
+                .supportsUnknownFields(true)
+                .randomFieldsExcludeFilter(path -> path.isEmpty() == false)
+                .test();
+    }
+
+    private static GetDataFrameTransformStatsResponse createTestInstance() {
+        int count = randomIntBetween(1, 3);
+        List<DataFrameTransformStateAndStats> stats = new ArrayList<>();
+        for (int i=0; i<count; i++) {
+            stats.add(DataFrameTransformStateAndStatsTests.randomInstance());
+        }
+
+        List<TaskOperationFailure> taskFailures = null;
+        if (randomBoolean()) {
+            taskFailures = new ArrayList<>();
+            int numTaskFailures = randomIntBetween(1, 4);
+            for (int i=0; i<numTaskFailures; i++) {
+                taskFailures.add(new TaskOperationFailure(randomAlphaOfLength(4), randomNonNegativeLong(), new IllegalStateException()));
+            }
+        }
+        List<ElasticsearchException> nodeFailures = null;
+        if (randomBoolean()) {
+            nodeFailures = new ArrayList<>();
+            int numNodeFailures = randomIntBetween(1, 4);
+            for (int i=0; i<numNodeFailures; i++) {
+                nodeFailures.add(new ElasticsearchException("GetDataFrameTransformStatsResponseTests"));
+            }
+        }
+
+        return new GetDataFrameTransformStatsResponse(stats, taskFailures, nodeFailures);
+    }
+
+    private static void toXContent(GetDataFrameTransformStatsResponse response, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        {
+            builder.startArray("transforms");
+            for (DataFrameTransformStateAndStats stats : response.getTransformsStateAndStats()) {
+                DataFrameTransformStateAndStatsTests.toXContent(stats, builder);
+            }
+            builder.endArray();
+
+            AcknowledgedTasksResponseTests.taskFailuresToXContent(response.getTaskFailures(), builder);
+            AcknowledgedTasksResponseTests.nodeFailuresToXContent(response.getNodeFailures(), builder);
+        }
+        builder.endObject();
+    }
+
+    // Serialisation of TaskOperationFailure and ElasticsearchException changes
+    // the object so use a custom compare method rather than Object.equals
+    private static void assertEqualInstances(GetDataFrameTransformStatsResponse expected,
+                                             GetDataFrameTransformStatsResponse actual) {
+        assertEquals(expected.getTransformsStateAndStats(), actual.getTransformsStateAndStats());
+        AcknowledgedTasksResponseTests.assertTaskOperationFailuresEqual(expected.getTaskFailures(), actual.getTaskFailures());
+        AcknowledgedTasksResponseTests.assertNodeFailuresEqual(expected.getNodeFailures(), actual.getNodeFailures());
+    }
+}

+ 62 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameIndexerTransformStatsTests.java

@@ -0,0 +1,62 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.client.core.IndexerJobStats;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class DataFrameIndexerTransformStatsTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(
+                this::createParser,
+                DataFrameIndexerTransformStatsTests::randomStats,
+                DataFrameIndexerTransformStatsTests::toXContent,
+                DataFrameIndexerTransformStats::fromXContent)
+                .supportsUnknownFields(true)
+                .test();
+    }
+
+    public static DataFrameIndexerTransformStats randomStats() {
+        return new DataFrameIndexerTransformStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+                randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
+                randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
+    }
+
+    public static void toXContent(DataFrameIndexerTransformStats stats, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        builder.field(IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
+        builder.field(IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
+        builder.field(IndexerJobStats.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
+        builder.field(IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
+        builder.field(IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
+        builder.field(IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
+        builder.field(IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
+        builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
+        builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
+        builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
+        builder.endObject();
+    }
+}

+ 56 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateAndStatsTests.java

@@ -0,0 +1,56 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class DataFrameTransformStateAndStatsTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(this::createParser,
+                DataFrameTransformStateAndStatsTests::randomInstance,
+                DataFrameTransformStateAndStatsTests::toXContent,
+                DataFrameTransformStateAndStats::fromXContent)
+                .supportsUnknownFields(true)
+                .randomFieldsExcludeFilter(field -> field.startsWith("state"))
+                .test();
+    }
+
+    public static DataFrameTransformStateAndStats randomInstance() {
+        return new DataFrameTransformStateAndStats(randomAlphaOfLength(10),
+                DataFrameTransformStateTests.randomDataFrameTransformState(),
+                DataFrameIndexerTransformStatsTests.randomStats());
+    }
+
+    public static void toXContent(DataFrameTransformStateAndStats stateAndStats, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        builder.field(DataFrameTransformStateAndStats.ID.getPreferredName(), stateAndStats.getId());
+        builder.field(DataFrameTransformStateAndStats.STATE_FIELD.getPreferredName());
+        DataFrameTransformStateTests.toXContent(stateAndStats.getTransformState(), builder);
+        builder.field(DataFrameTransformStateAndStats.STATS_FIELD.getPreferredName());
+        DataFrameIndexerTransformStatsTests.toXContent(stateAndStats.getTransformStats(), builder);
+        builder.endObject();
+    }
+}

+ 74 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformStateTests.java

@@ -0,0 +1,74 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.dataframe.transforms;
+
+import org.elasticsearch.client.core.IndexerState;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
+
+public class DataFrameTransformStateTests extends ESTestCase {
+
+    public void testFromXContent() throws IOException {
+        xContentTester(this::createParser,
+                DataFrameTransformStateTests::randomDataFrameTransformState,
+                DataFrameTransformStateTests::toXContent,
+                DataFrameTransformState::fromXContent)
+                .supportsUnknownFields(false)
+                .test();
+    }
+
+    public static DataFrameTransformState randomDataFrameTransformState() {
+        return new DataFrameTransformState(randomFrom(IndexerState.values()), randomPositionMap(), randomLongBetween(0,10));
+    }
+
+    public static void toXContent(DataFrameTransformState state, XContentBuilder builder) throws IOException {
+        builder.startObject();
+        builder.field("transform_state", state.getIndexerState().value());
+        if (state.getPosition() != null) {
+            builder.field("current_position", state.getPosition());
+        }
+        builder.field("generation", state.getGeneration());
+        builder.endObject();
+    }
+
+    private static Map<String, Object> randomPositionMap() {
+        if (randomBoolean()) {
+            return null;
+        }
+        int numFields = randomIntBetween(1, 5);
+        Map<String, Object> position = new HashMap<>();
+        for (int i = 0; i < numFields; i++) {
+            Object value;
+            if (randomBoolean()) {
+                value = randomLong();
+            } else {
+                value = randomAlphaOfLengthBetween(1, 10);
+            }
+            position.put(randomAlphaOfLengthBetween(3, 10), value);
+        }
+        return position;
+    }
+}

+ 79 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -25,7 +25,10 @@ import org.elasticsearch.client.ESRestHighLevelClientTestCase;
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.core.AcknowledgedResponse;
+import org.elasticsearch.client.core.IndexerState;
 import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsRequest;
+import org.elasticsearch.client.dataframe.GetDataFrameTransformStatsResponse;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.PreviewDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
@@ -33,7 +36,9 @@ import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
 import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;
+import org.elasticsearch.client.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.client.dataframe.transforms.DataFrameTransformConfig;
+import org.elasticsearch.client.dataframe.transforms.DataFrameTransformStateAndStats;
 import org.elasticsearch.client.dataframe.transforms.DestConfig;
 import org.elasticsearch.client.dataframe.transforms.QueryConfig;
 import org.elasticsearch.client.dataframe.transforms.SourceConfig;
@@ -58,6 +63,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.hasSize;
 
 public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTestCase {
 
@@ -425,4 +431,77 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
             assertTrue(latch.await(30L, TimeUnit.SECONDS));
         }
     }
+
+    public void testGetStats() throws IOException, InterruptedException {
+        createIndex("source-data");
+
+        RestHighLevelClient client = highLevelClient();
+
+        QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
+        GroupConfig groupConfig = new GroupConfig(Collections.singletonMap("reviewer", new TermsGroupSource("user_id")));
+        AggregatorFactories.Builder aggBuilder = new AggregatorFactories.Builder();
+        aggBuilder.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
+        AggregationConfig aggConfig = new AggregationConfig(aggBuilder);
+        PivotConfig pivotConfig = new PivotConfig(groupConfig, aggConfig);
+
+        String id = "statisitcal-transform";
+        DataFrameTransformConfig transformConfig = new DataFrameTransformConfig(id,
+                new SourceConfig(new String[]{"source-data"}, queryConfig), new DestConfig("dest"), pivotConfig);
+        client.dataFrame().putDataFrameTransform(new PutDataFrameTransformRequest(transformConfig), RequestOptions.DEFAULT);
+
+        // tag::get-data-frame-transform-stats-request
+        GetDataFrameTransformStatsRequest request =
+                new GetDataFrameTransformStatsRequest(id); // <1>
+        // end::get-data-frame-transform-stats-request
+
+        {
+            // tag::get-data-frame-transform-stats-execute
+            GetDataFrameTransformStatsResponse response =
+                client.dataFrame()
+                    .getDataFrameTransformStats(request, RequestOptions.DEFAULT);
+            // end::get-data-frame-transform-stats-execute
+
+            assertThat(response.getTransformsStateAndStats(), hasSize(1));
+
+            // tag::get-data-frame-transform-stats-response
+            DataFrameTransformStateAndStats stateAndStats =
+                    response.getTransformsStateAndStats().get(0);   // <1>
+            IndexerState indexerState =
+                    stateAndStats.getTransformState().getIndexerState();  // <2>
+            DataFrameIndexerTransformStats transformStats =
+                    stateAndStats.getTransformStats();              // <3>
+            // end::get-data-frame-transform-stats-response
+
+            assertEquals(IndexerState.STOPPED, indexerState);
+            assertNotNull(transformStats);
+        }
+        {
+            // tag::get-data-frame-transform-stats-execute-listener
+            ActionListener<GetDataFrameTransformStatsResponse> listener =
+                new ActionListener<GetDataFrameTransformStatsResponse>() {
+                    @Override
+                    public void onResponse(
+                            GetDataFrameTransformStatsResponse response) {
+                        // <1>
+                    }
+
+                    @Override
+                    public void onFailure(Exception e) {
+                        // <2>
+                    }
+                };
+            // end::get-data-frame-transform-stats-execute-listener
+
+            // Replace the empty listener by a blocking listener in test
+            final CountDownLatch latch = new CountDownLatch(1);
+            listener = new LatchedActionListener<>(listener, latch);
+
+            // tag::get-data-frame-transform-stats-execute-async
+            client.dataFrame().getDataFrameTransformStatsAsync(
+                    request, RequestOptions.DEFAULT, listener);  // <1>
+            // end::get-data-frame-transform-stats-execute-async
+
+            assertTrue(latch.await(30L, TimeUnit.SECONDS));
+        }
+    }
 }

+ 14 - 13
client/rest-high-level/src/test/java/org/elasticsearch/client/rollup/GetRollupJobResponseTests.java

@@ -19,13 +19,14 @@
 
 package org.elasticsearch.client.rollup;
 
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.client.rollup.GetRollupJobResponse.IndexerState;
+import org.elasticsearch.client.core.IndexerJobStats;
+import org.elasticsearch.client.core.IndexerState;
 import org.elasticsearch.client.rollup.GetRollupJobResponse.JobWrapper;
 import org.elasticsearch.client.rollup.GetRollupJobResponse.RollupIndexerJobStats;
 import org.elasticsearch.client.rollup.GetRollupJobResponse.RollupJobStatus;
 import org.elasticsearch.client.rollup.job.config.RollupJobConfigTests;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.test.ESTestCase;
 
 import java.io.IOException;
@@ -113,16 +114,16 @@ public class GetRollupJobResponseTests extends ESTestCase {
 
     public void toXContent(RollupIndexerJobStats stats, XContentBuilder builder, ToXContent.Params params) throws IOException {
         builder.startObject();
-        builder.field(GetRollupJobResponse.NUM_PAGES.getPreferredName(), stats.getNumPages());
-        builder.field(GetRollupJobResponse.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
-        builder.field(GetRollupJobResponse.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
-        builder.field(GetRollupJobResponse.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
-        builder.field(GetRollupJobResponse.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
-        builder.field(GetRollupJobResponse.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
-        builder.field(GetRollupJobResponse.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
-        builder.field(GetRollupJobResponse.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
-        builder.field(GetRollupJobResponse.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
-        builder.field(GetRollupJobResponse.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
+        builder.field(IndexerJobStats.NUM_PAGES.getPreferredName(), stats.getNumPages());
+        builder.field(IndexerJobStats.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
+        builder.field(GetRollupJobResponse.ROLLUPS_INDEXED.getPreferredName(), stats.getOutputDocuments());
+        builder.field(IndexerJobStats.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
+        builder.field(IndexerJobStats.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
+        builder.field(IndexerJobStats.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
+        builder.field(IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
+        builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
+        builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
+        builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
         builder.endObject();
     }
 

+ 39 - 0
docs/java-rest/high-level/dataframe/get_data_frame_stats.asciidoc

@@ -0,0 +1,39 @@
+--
+:api: get-data-frame-transform-stats
+:request: GetDataFrameTransformStatsRequest
+:response: GetDataFrameTransformStatsResponse
+--
+[id="{upid}-{api}"]
+=== Get Data Frame Transform Stats API
+
+The Get Data Frame Transform Stats API is used read the operational statistics
+of one or more {dataframe-transform}s.
+The API accepts a +{request}+ object and returns a +{response}+.
+
+[id="{upid}-{api}-request"]
+==== Get Data Frame Transform Stats Request
+
+A +{request}+ requires a data frame transform id or the special wildcard `_all`
+to get the statistics for all {dataframe-transform}s
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request]
+--------------------------------------------------
+<1> Constructing a new GET Stats request referencing an existing {dataframe-transform}
+
+
+include::../execution.asciidoc[]
+
+[id="{upid}-{api}-response"]
+==== Response
+
+The returned +{response}+ contains the requested {dataframe-transform} statistics.
+
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-response]
+--------------------------------------------------
+<1> The response contains a list of `DataFrameTransformStateAndStats` objects
+<2> The running state of the transform e.g `started`
+<3> The transform progress statistics recording the number of documents indexed etc

+ 2 - 0
docs/java-rest/high-level/supported-apis.asciidoc

@@ -554,12 +554,14 @@ include::ilm/remove_lifecycle_policy_from_index.asciidoc[]
 
 The Java High Level REST Client supports the following Data Frame APIs:
 
+* <<{upid}-get-data-frame-transform-stats>>
 * <<{upid}-put-data-frame-transform>>
 * <<{upid}-delete-data-frame-transform>>
 * <<{upid}-preview-data-frame-transform>>
 * <<{upid}-start-data-frame-transform>>
 * <<{upid}-stop-data-frame-transform>>
 
+include::dataframe/get_data_frame_stats.asciidoc[]
 include::dataframe/put_data_frame.asciidoc[]
 include::dataframe/delete_data_frame.asciidoc[]
 include::dataframe/preview_data_frame.asciidoc[]

+ 4 - 4
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java

@@ -21,21 +21,21 @@ import org.elasticsearch.env.Environment;
 import org.elasticsearch.license.XPackLicenseState;
 import org.elasticsearch.plugins.Platforms;
 import org.elasticsearch.xpack.core.XPackFeatureSet;
+import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.XPackSettings;
-import org.elasticsearch.xpack.core.XPackField;
 import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
 import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
 import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
 import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
 import org.elasticsearch.xpack.core.ml.job.config.Job;
 import org.elasticsearch.xpack.core.ml.job.config.JobState;
-import org.elasticsearch.xpack.ml.job.JobManagerHolder;
-import org.elasticsearch.xpack.ml.process.NativeController;
-import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
 import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
 import org.elasticsearch.xpack.core.ml.stats.ForecastStats;
 import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator;
+import org.elasticsearch.xpack.ml.job.JobManagerHolder;
+import org.elasticsearch.xpack.ml.process.NativeController;
+import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
 
 import java.io.IOException;
 import java.util.Arrays;