瀏覽代碼

[Transform] provide exponential_avg* stats for batch transforms (#52041)

provide exponential_avg* stats for batch transforms, avoids confusion why those values are all 0
otherwise
Hendrik Muhs 5 年之前
父節點
當前提交
34734ae15b

+ 87 - 33
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStats.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.transform.transforms;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
@@ -36,23 +37,34 @@ public class TransformIndexerStats extends IndexerJobStats {
     public static ParseField SEARCH_TOTAL = new ParseField("search_total");
     public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
     public static ParseField INDEX_FAILURES = new ParseField("index_failures");
-    public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS =
-        new ParseField("exponential_avg_checkpoint_duration_ms");
-    public static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED =
-        new ParseField("exponential_avg_documents_indexed");
-    public static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED =
-        new ParseField("exponential_avg_documents_processed");
+    public static ParseField EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS = new ParseField("exponential_avg_checkpoint_duration_ms");
+    public static ParseField EXPONENTIAL_AVG_DOCUMENTS_INDEXED = new ParseField("exponential_avg_documents_indexed");
+    public static ParseField EXPONENTIAL_AVG_DOCUMENTS_PROCESSED = new ParseField("exponential_avg_documents_processed");
 
     // This changes how much "weight" past calculations have.
     // The shorter the window, the less "smoothing" will occur.
     private static final int EXP_AVG_WINDOW = 10;
-    private static final double ALPHA = 2.0/(EXP_AVG_WINDOW + 1);
+    private static final double ALPHA = 2.0 / (EXP_AVG_WINDOW + 1);
 
     private static final ConstructingObjectParser<TransformIndexerStats, Void> LENIENT_PARSER = new ConstructingObjectParser<>(
-            NAME, true,
-            args -> new TransformIndexerStats(
-               (long) args[0], (long) args[1], (long) args[2], (long) args[3], (long) args[4], (long) args[5], (long) args[6],
-               (long) args[7], (long) args[8], (long) args[9], (Double) args[10], (Double) args[11], (Double) args[12]));
+        NAME,
+        true,
+        args -> new TransformIndexerStats(
+            (long) args[0],
+            (long) args[1],
+            (long) args[2],
+            (long) args[3],
+            (long) args[4],
+            (long) args[5],
+            (long) args[6],
+            (long) args[7],
+            (long) args[8],
+            (long) args[9],
+            (Double) args[10],
+            (Double) args[11],
+            (Double) args[12]
+        )
+    );
 
     static {
         LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
@@ -73,6 +85,7 @@ public class TransformIndexerStats extends IndexerJobStats {
     private double expAvgCheckpointDurationMs;
     private double expAvgDocumentsIndexed;
     private double expAvgDocumentsProcessed;
+
     /**
      * Create with all stats set to zero
      */
@@ -80,30 +93,54 @@ public class TransformIndexerStats extends IndexerJobStats {
         super();
     }
 
-    public TransformIndexerStats(long numPages, long numInputDocuments, long numOutputDocuments,
-                                          long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
-                                          long indexFailures, long searchFailures, Double expAvgCheckpointDurationMs,
-                                          Double expAvgDocumentsIndexed, Double expAvgDocumentsProcessed ) {
-        super(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
-            indexFailures, searchFailures);
+    public TransformIndexerStats(
+        long numPages,
+        long numInputDocuments,
+        long numOutputDocuments,
+        long numInvocations,
+        long indexTime,
+        long searchTime,
+        long indexTotal,
+        long searchTotal,
+        long indexFailures,
+        long searchFailures,
+        Double expAvgCheckpointDurationMs,
+        Double expAvgDocumentsIndexed,
+        Double expAvgDocumentsProcessed
+    ) {
+        super(
+            numPages,
+            numInputDocuments,
+            numOutputDocuments,
+            numInvocations,
+            indexTime,
+            searchTime,
+            indexTotal,
+            searchTotal,
+            indexFailures,
+            searchFailures
+        );
         this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
         this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
         this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
     }
 
-    public TransformIndexerStats(long numPages, long numInputDocuments, long numOutputDocuments,
-                                          long numInvocations, long indexTime, long searchTime, long indexTotal, long searchTotal,
-                                          long indexFailures, long searchFailures) {
-        this(numPages, numInputDocuments, numOutputDocuments, numInvocations, indexTime, searchTime, indexTotal, searchTotal,
-            indexFailures, searchFailures, 0.0, 0.0, 0.0);
-    }
-
     public TransformIndexerStats(TransformIndexerStats other) {
-        this(other.numPages, other.numInputDocuments, other.numOuputDocuments, other.numInvocations,
-            other.indexTime, other.searchTime, other.indexTotal, other.searchTotal, other.indexFailures, other.searchFailures);
-        this.expAvgCheckpointDurationMs = other.expAvgCheckpointDurationMs;
-        this.expAvgDocumentsIndexed = other.expAvgDocumentsIndexed;
-        this.expAvgDocumentsProcessed = other.expAvgDocumentsProcessed;
+        this(
+            other.numPages,
+            other.numInputDocuments,
+            other.numOuputDocuments,
+            other.numInvocations,
+            other.indexTime,
+            other.searchTime,
+            other.indexTotal,
+            other.searchTotal,
+            other.indexFailures,
+            other.searchFailures,
+            other.expAvgCheckpointDurationMs,
+            other.expAvgDocumentsIndexed,
+            other.expAvgDocumentsProcessed
+        );
     }
 
     public TransformIndexerStats(StreamInput in) throws IOException {
@@ -180,7 +217,7 @@ public class TransformIndexerStats extends IndexerJobStats {
     }
 
     private double calculateExpAvg(double previousExpValue, double alpha, long observedValue) {
-        return alpha * observedValue + (1-alpha) * previousExpValue;
+        return alpha * observedValue + (1 - alpha) * previousExpValue;
     }
 
     @Override
@@ -212,9 +249,26 @@ public class TransformIndexerStats extends IndexerJobStats {
 
     @Override
     public int hashCode() {
-        return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
-            indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
-            expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
+        return Objects.hash(
+            numPages,
+            numInputDocuments,
+            numOuputDocuments,
+            numInvocations,
+            indexTime,
+            searchTime,
+            indexFailures,
+            searchFailures,
+            indexTotal,
+            searchTotal,
+            expAvgCheckpointDurationMs,
+            expAvgDocumentsIndexed,
+            expAvgDocumentsProcessed
+        );
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
     }
 
     public static TransformIndexerStats fromXContent(XContentParser parser) {

+ 12 - 4
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformIndexerStatsTests.java

@@ -31,13 +31,21 @@ public class TransformIndexerStatsTests extends AbstractSerializingTestCase<Tran
     }
 
     public static TransformIndexerStats randomStats() {
-        return new TransformIndexerStats(randomLongBetween(10L, 10000L),
-            randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
-            randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
+        return new TransformIndexerStats(
+            randomLongBetween(10L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
+            randomLongBetween(0L, 10000L),
             randomLongBetween(0L, 10000L),
             randomBoolean() ? randomDouble() : null,
             randomBoolean() ? randomDouble() : null,
-            randomBoolean() ? randomDouble() : null);
+            randomBoolean() ? randomDouble() : null
+        );
     }
 
     public void testExpAvgIncrement() {

+ 1 - 1
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformStatsTests.java

@@ -69,7 +69,7 @@ public class TransformStatsTests extends AbstractSerializingTestCase<TransformSt
                 STARTED,
                 randomBoolean() ? null : randomAlphaOfLength(100),
                 randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
-                new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
+                new TransformIndexerStats(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0.0, 0.0, 0.0),
                 new TransformCheckpointingInfo(
                     new TransformCheckpointStats(0, null, null, 10, 100),
                     new TransformCheckpointStats(0, null, null, 100, 1000),

+ 68 - 54
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java

@@ -29,11 +29,9 @@ import static org.hamcrest.Matchers.oneOf;
 public class TransformGetAndGetStatsIT extends TransformRestTestCase {
 
     private static final String TEST_USER_NAME = "transform_user";
-    private static final String BASIC_AUTH_VALUE_TRANSFORM_USER =
-        basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
+    private static final String BASIC_AUTH_VALUE_TRANSFORM_USER = basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
     private static final String TEST_ADMIN_USER_NAME = "transform_admin";
-    private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN =
-        basicAuthHeaderValue(TEST_ADMIN_USER_NAME, TEST_PASSWORD_SECURE_STRING);
+    private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN = basicAuthHeaderValue(TEST_ADMIN_USER_NAME, TEST_PASSWORD_SECURE_STRING);
 
     private static boolean indicesCreated = false;
 
@@ -101,13 +99,13 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         stats = entityAsMap(client().performRequest(getRequest));
         assertEquals(3, XContentMapValues.extractValue("count", stats));
 
-        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
+        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", stats);
         // Verify that both transforms have valid stats
         for (Map<String, Object> transformStats : transformsStats) {
-            Map<String, Object> stat = (Map<String, Object>)transformStats.get("stats");
-            assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0));
-            assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0));
-            assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0));
+            Map<String, Object> stat = (Map<String, Object>) transformStats.get("stats");
+            assertThat("documents_processed is not > 0.", ((Integer) stat.get("documents_processed")), greaterThan(0));
+            assertThat("search_total is not > 0.", ((Integer) stat.get("search_total")), greaterThan(0));
+            assertThat("pages_processed is not > 0.", ((Integer) stat.get("pages_processed")), greaterThan(0));
             /* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here
             Map<String, Object> progress =
                 (Map<String, Object>)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats);
@@ -122,7 +120,7 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         stats = entityAsMap(client().performRequest(getRequest));
         assertEquals(1, XContentMapValues.extractValue("count", stats));
 
-        transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
+        transformsStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", stats);
         assertEquals(1, transformsStats.size());
         assertEquals("stopped", XContentMapValues.extractValue("state", transformsStats.get(0)));
         assertNull(XContentMapValues.extractValue("checkpointing.next.position", transformsStats.get(0)));
@@ -133,12 +131,11 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         stats = entityAsMap(client().performRequest(getRequest));
         assertEquals(1, XContentMapValues.extractValue("count", stats));
 
-        transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
+        transformsStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", stats);
         assertEquals(1, transformsStats.size());
         assertThat(XContentMapValues.extractValue("state", transformsStats.get(0)), oneOf("started", "indexing"));
         assertEquals(1, XContentMapValues.extractValue("checkpointing.last.checkpoint", transformsStats.get(0)));
 
-
         // check all the different ways to retrieve all transforms
         getRequest = createRequestWithAuth("GET", getTransformEndpoint(), authHeader);
         Map<String, Object> transforms = entityAsMap(client().performRequest(getRequest));
@@ -165,12 +162,13 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         stopTransform("pivot_stats_1", false);
 
         // Get rid of the first transform task, but keep the configuration
-        client().performRequest(new Request("POST", "_tasks/_cancel?actions="+TransformField.TASK_NAME+"*"));
+        client().performRequest(new Request("POST", "_tasks/_cancel?actions=" + TransformField.TASK_NAME + "*"));
 
         // Verify that the task is gone
-        Map<String, Object> tasks =
-            entityAsMap(client().performRequest(new Request("GET", "_tasks?actions="+TransformField.TASK_NAME+"*")));
-        assertTrue(((Map<?, ?>)XContentMapValues.extractValue("nodes", tasks)).isEmpty());
+        Map<String, Object> tasks = entityAsMap(
+            client().performRequest(new Request("GET", "_tasks?actions=" + TransformField.TASK_NAME + "*"))
+        );
+        assertTrue(((Map<?, ?>) XContentMapValues.extractValue("nodes", tasks)).isEmpty());
 
         createPivotReviewsTransform("pivot_stats_2", "pivot_reviews_stats_2", null);
         startAndWaitForTransform("pivot_stats_2", "pivot_reviews_stats_2");
@@ -178,13 +176,13 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + "_stats", BASIC_AUTH_VALUE_TRANSFORM_ADMIN);
         Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
         assertEquals(2, XContentMapValues.extractValue("count", stats));
-        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
+        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", stats);
         // Verify that both transforms, the one with the task and the one without have statistics
         for (Map<String, Object> transformStats : transformsStats) {
-            Map<String, Object> stat = (Map<String, Object>)transformStats.get("stats");
-            assertThat(((Integer)stat.get("documents_processed")), greaterThan(0));
-            assertThat(((Integer)stat.get("search_total")), greaterThan(0));
-            assertThat(((Integer)stat.get("pages_processed")), greaterThan(0));
+            Map<String, Object> stat = (Map<String, Object>) transformStats.get("stats");
+            assertThat(((Integer) stat.get("documents_processed")), greaterThan(0));
+            assertThat(((Integer) stat.get("search_total")), greaterThan(0));
+            assertThat(((Integer) stat.get("pages_processed")), greaterThan(0));
         }
     }
 
@@ -202,13 +200,13 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId + "/_stats", authHeader);
         Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
         assertEquals(1, XContentMapValues.extractValue("count", stats));
-        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
+        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", stats);
         // Verify that the transform has stats and the total docs process matches the expected
         for (Map<String, Object> transformStats : transformsStats) {
-            Map<String, Object> stat = (Map<String, Object>)transformStats.get("stats");
-            assertThat("documents_processed is not > 0.", ((Integer)stat.get("documents_processed")), greaterThan(0));
-            assertThat("search_total is not > 0.", ((Integer)stat.get("search_total")), greaterThan(0));
-            assertThat("pages_processed is not > 0.", ((Integer)stat.get("pages_processed")), greaterThan(0));
+            Map<String, Object> stat = (Map<String, Object>) transformStats.get("stats");
+            assertThat("documents_processed is not > 0.", ((Integer) stat.get("documents_processed")), greaterThan(0));
+            assertThat("search_total is not > 0.", ((Integer) stat.get("search_total")), greaterThan(0));
+            assertThat("pages_processed is not > 0.", ((Integer) stat.get("pages_processed")), greaterThan(0));
             /* TODO progress is now checkpoint progress and it may be that no checkpoint is in progress here
             Map<String, Object> progress =
                 (Map<String, Object>)XContentMapValues.extractValue("checkpointing.next.checkpoint_progress", transformStats);
@@ -226,8 +224,12 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         String transformSrc = "reviews_cont_pivot_test";
         createReviewsIndex(transformSrc);
         final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null);
-        String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"},"
-            + " \"source\": {\"index\":\"" + transformSrc + "\"},"
+        String config = "{ \"dest\": {\"index\":\""
+            + transformDest
+            + "\"},"
+            + " \"source\": {\"index\":\""
+            + transformSrc
+            + "\"},"
             + " \"frequency\": \"1s\","
             + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}},"
             + " \"pivot\": {"
@@ -251,20 +253,28 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
 
         Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId + "/_stats", null);
         Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
-        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", stats);
+        List<Map<String, Object>> transformsStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", stats);
         assertEquals(1, transformsStats.size());
-        // No continuous checkpoints have been seen and thus all exponential averages should be 0.0
+        // No continuous checkpoints have been seen and thus all exponential averages should be equal to the batch stats
         for (Map<String, Object> transformStats : transformsStats) {
-            transformStats = (Map<String, Object>)transformStats.get("stats");
-            assertThat("exponential_avg_checkpoint_duration_ms is not 0.0",
-                transformStats.get("exponential_avg_checkpoint_duration_ms"),
-                equalTo(0.0));
-            assertThat("exponential_avg_documents_indexed is not 0.0",
-                transformStats.get("exponential_avg_documents_indexed"),
-                equalTo(0.0));
-            assertThat("exponential_avg_documents_processed is not 0.0",
+            transformStats = (Map<String, Object>) transformStats.get("stats");
+            assertThat(transformStats.get("documents_processed"), equalTo(1000));
+            assertThat(transformStats.get("documents_indexed"), equalTo(27));
+            assertThat(
+                "exponential_avg_checkpoint_duration_ms is not 0.0",
+                (Double) transformStats.get("exponential_avg_checkpoint_duration_ms"),
+                greaterThan(0.0)
+            );
+            assertThat(
+                "exponential_avg_documents_indexed does not match documents_indexed",
+                (Double) transformStats.get("exponential_avg_documents_indexed"),
+                equalTo(((Integer) transformStats.get("documents_indexed")).doubleValue())
+            );
+            assertThat(
+                "exponential_avg_documents_processed does not match documents_processed",
                 transformStats.get("exponential_avg_documents_processed"),
-                equalTo(0.0));
+                equalTo(((Integer) transformStats.get("documents_processed")).doubleValue())
+            );
         }
 
         int numDocs = 10;
@@ -296,23 +306,27 @@ public class TransformGetAndGetStatsIT extends TransformRestTestCase {
         // We should now have exp avgs since we have processed a continuous checkpoint
         assertBusy(() -> {
             Map<String, Object> statsResponse = entityAsMap(client().performRequest(getRequest));
-            List<Map<String, Object>> contStats = (List<Map<String, Object>>)XContentMapValues.extractValue("transforms", statsResponse);
+            List<Map<String, Object>> contStats = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", statsResponse);
             assertEquals(1, contStats.size());
             for (Map<String, Object> transformStats : contStats) {
-                Map<String, Object> statsObj = (Map<String, Object>)transformStats.get("stats");
-                assertThat("exponential_avg_checkpoint_duration_ms is 0",
-                    (Double)statsObj.get("exponential_avg_checkpoint_duration_ms"),
-                    greaterThan(0.0));
-                assertThat("exponential_avg_documents_indexed is 0",
-                    (Double)statsObj.get("exponential_avg_documents_indexed"),
-                    greaterThan(0.0));
-                assertThat("exponential_avg_documents_processed is 0",
-                    (Double)statsObj.get("exponential_avg_documents_processed"),
-                    greaterThan(0.0));
-                Map<String, Object> checkpointing = (Map<String, Object>)transformStats.get("checkpointing");
-                assertThat("changes_last_detected_at is null",
-                    checkpointing.get("changes_last_detected_at"),
-                    is(notNullValue()));
+                Map<String, Object> statsObj = (Map<String, Object>) transformStats.get("stats");
+                assertThat(
+                    "exponential_avg_checkpoint_duration_ms is 0",
+                    (Double) statsObj.get("exponential_avg_checkpoint_duration_ms"),
+                    greaterThan(0.0)
+                );
+                assertThat(
+                    "exponential_avg_documents_indexed is 0",
+                    (Double) statsObj.get("exponential_avg_documents_indexed"),
+                    greaterThan(0.0)
+                );
+                assertThat(
+                    "exponential_avg_documents_processed is 0",
+                    (Double) statsObj.get("exponential_avg_documents_processed"),
+                    greaterThan(0.0)
+                );
+                Map<String, Object> checkpointing = (Map<String, Object>) transformStats.get("checkpointing");
+                assertThat("changes_last_detected_at is null", checkpointing.get("changes_last_detected_at"), is(notNullValue()));
             }
         }, 120, TimeUnit.SECONDS);
     }

+ 39 - 15
x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformUsageIT.java

@@ -52,10 +52,14 @@ public class TransformUsageIT extends TransformRestTestCase {
         startAndWaitForTransform("test_usage", "pivot_reviews");
         stopTransform("test_usage", false);
 
-        Request statsExistsRequest = new Request("GET",
-            TransformInternalIndexConstants.LATEST_INDEX_NAME+"/_search?q=" +
-                INDEX_DOC_TYPE.getPreferredName() + ":" +
-                TransformStoredDoc.NAME);
+        Request statsExistsRequest = new Request(
+            "GET",
+            TransformInternalIndexConstants.LATEST_INDEX_NAME
+                + "/_search?q="
+                + INDEX_DOC_TYPE.getPreferredName()
+                + ":"
+                + TransformStoredDoc.NAME
+        );
         // Verify that we have one stat document
         assertBusy(() -> {
             Map<String, Object> hasStatsMap = entityAsMap(client().performRequest(statsExistsRequest));
@@ -66,13 +70,21 @@ public class TransformUsageIT extends TransformRestTestCase {
 
         Request getRequest = new Request("GET", getTransformEndpoint() + "test_usage/_stats");
         Map<String, Object> stats = entityAsMap(client().performRequest(getRequest));
-        Map<String, Integer> expectedStats = new HashMap<>();
-        for(String statName : PROVIDED_STATS) {
+        Map<String, Double> expectedStats = new HashMap<>();
+        for (String statName : PROVIDED_STATS) {
             @SuppressWarnings("unchecked")
-            List<Integer> specificStatistic = ((List<Integer>)XContentMapValues.extractValue("transforms.stats." + statName, stats));
+            List<Object> specificStatistic = (List<Object>) (XContentMapValues.extractValue("transforms.stats." + statName, stats));
             assertNotNull(specificStatistic);
-            Integer statistic = (specificStatistic).get(0);
-            expectedStats.put(statName, statistic);
+            expectedStats.put(statName, extractStatsAsDouble(specificStatistic.get(0)));
+        }
+
+        getRequest = new Request("GET", getTransformEndpoint() + "test_usage_continuous/_stats");
+        stats = entityAsMap(client().performRequest(getRequest));
+        for (String statName : PROVIDED_STATS) {
+            @SuppressWarnings("unchecked")
+            List<Object> specificStatistic = (List<Object>) (XContentMapValues.extractValue("transforms.stats." + statName, stats));
+            assertNotNull(specificStatistic);
+            expectedStats.compute(statName, (key, value) -> value + extractStatsAsDouble(specificStatistic.get(0)));
         }
 
         // Simply because we wait for continuous to reach checkpoint 1, does not mean that the statistics are written yet.
@@ -85,20 +97,22 @@ public class TransformUsageIT extends TransformRestTestCase {
             assertEquals(3, XContentMapValues.extractValue("transform.transforms._all", statsMap));
             assertEquals(2, XContentMapValues.extractValue("transform.transforms.stopped", statsMap));
             assertEquals(1, XContentMapValues.extractValue("transform.transforms.started", statsMap));
-            for(String statName : PROVIDED_STATS) {
+            for (String statName : PROVIDED_STATS) {
                 if (statName.equals(TransformIndexerStats.INDEX_TIME_IN_MS.getPreferredName())
-                    ||statName.equals(TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName())) {
+                    || statName.equals(TransformIndexerStats.SEARCH_TIME_IN_MS.getPreferredName())) {
                     continue;
                 }
-                assertEquals("Incorrect stat " +  statName,
-                    expectedStats.get(statName) * 2,
-                    XContentMapValues.extractValue("transform.stats." + statName, statsMap));
+                assertEquals(
+                    "Incorrect stat " + statName,
+                    expectedStats.get(statName).doubleValue(),
+                    extractStatsAsDouble(XContentMapValues.extractValue("transform.stats." + statName, statsMap)),
+                    0.0001
+                );
             }
             // Refresh the index so that statistics are searchable
             refreshIndex(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
         }, 60, TimeUnit.SECONDS);
 
-
         stopTransform("test_usage_continuous", false);
 
         usageResponse = client().performRequest(new Request("GET", "_xpack/usage"));
@@ -107,4 +121,14 @@ public class TransformUsageIT extends TransformRestTestCase {
         assertEquals(3, XContentMapValues.extractValue("transform.transforms._all", usageAsMap));
         assertEquals(3, XContentMapValues.extractValue("transform.transforms.stopped", usageAsMap));
     }
+
+    private double extractStatsAsDouble(Object statsObject) {
+        if (statsObject instanceof Integer) {
+            return ((Integer) statsObject).doubleValue();
+        } else if (statsObject instanceof Double) {
+            return (Double) statsObject;
+        }
+        fail("unexpected value type for stats");
+        return 0;
+    }
 }

+ 56 - 46
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/TransformInfoTransportAction.java

@@ -37,7 +37,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-
 public class TransformInfoTransportAction extends XPackInfoFeatureTransportAction {
 
     private final boolean enabled;
@@ -56,11 +55,17 @@ public class TransformInfoTransportAction extends XPackInfoFeatureTransportActio
         TransformIndexerStats.SEARCH_TOTAL.getPreferredName(),
         TransformIndexerStats.INDEX_FAILURES.getPreferredName(),
         TransformIndexerStats.SEARCH_FAILURES.getPreferredName(),
-    };
+        TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
+        TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_INDEXED.getPreferredName(),
+        TransformIndexerStats.EXPONENTIAL_AVG_DOCUMENTS_PROCESSED.getPreferredName(), };
 
     @Inject
-    public TransformInfoTransportAction(TransportService transportService, ActionFilters actionFilters,
-                                        Settings settings, XPackLicenseState licenseState) {
+    public TransformInfoTransportAction(
+        TransportService transportService,
+        ActionFilters actionFilters,
+        Settings settings,
+        XPackLicenseState licenseState
+    ) {
         super(XPackInfoFeatureAction.TRANSFORM.name(), transportService, actionFilters);
         this.enabled = XPackSettings.TRANSFORM_ENABLED.get(settings);
         this.licenseState = licenseState;
@@ -82,67 +87,72 @@ public class TransformInfoTransportAction extends XPackInfoFeatureTransportActio
     }
 
     static TransformIndexerStats parseSearchAggs(SearchResponse searchResponse) {
-        List<Long> statisticsList = new ArrayList<>(PROVIDED_STATS.length);
+        List<Double> statisticsList = new ArrayList<>(PROVIDED_STATS.length);
 
-        for(String statName : PROVIDED_STATS) {
+        for (String statName : PROVIDED_STATS) {
             Aggregation agg = searchResponse.getAggregations().get(statName);
 
             if (agg instanceof NumericMetricsAggregation.SingleValue) {
-                statisticsList.add((long)((NumericMetricsAggregation.SingleValue)agg).value());
+                statisticsList.add(((NumericMetricsAggregation.SingleValue) agg).value());
             } else {
-                statisticsList.add(0L);
+                statisticsList.add(0.0);
             }
         }
-        return new TransformIndexerStats(statisticsList.get(0),  // numPages
-            statisticsList.get(1),  // numInputDocuments
-            statisticsList.get(2),  // numOutputDocuments
-            statisticsList.get(3),  // numInvocations
-            statisticsList.get(4),  // indexTime
-            statisticsList.get(5),  // searchTime
-            statisticsList.get(6),  // indexTotal
-            statisticsList.get(7),  // searchTotal
-            statisticsList.get(8),  // indexFailures
-            statisticsList.get(9)); // searchFailures
+        return new TransformIndexerStats(
+            statisticsList.get(0).longValue(),  // numPages
+            statisticsList.get(1).longValue(),  // numInputDocuments
+            statisticsList.get(2).longValue(),  // numOutputDocuments
+            statisticsList.get(3).longValue(),  // numInvocations
+            statisticsList.get(4).longValue(),  // indexTime
+            statisticsList.get(5).longValue(),  // searchTime
+            statisticsList.get(6).longValue(),  // indexTotal
+            statisticsList.get(7).longValue(),  // searchTotal
+            statisticsList.get(8).longValue(),  // indexFailures
+            statisticsList.get(9).longValue(), // searchFailures
+            statisticsList.get(10), // exponential_avg_checkpoint_duration_ms
+            statisticsList.get(11), // exponential_avg_documents_indexed
+            statisticsList.get(12)  // exponential_avg_documents_processed
+        );
     }
 
     static void getStatisticSummations(Client client, ActionListener<TransformIndexerStats> statsListener) {
-        QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
-            .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(),
-                    TransformStoredDoc.NAME)));
+        QueryBuilder queryBuilder = QueryBuilders.constantScoreQuery(
+            QueryBuilders.boolQuery()
+                .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformStoredDoc.NAME))
+        );
 
-        SearchRequestBuilder requestBuilder = client
-            .prepareSearch(
-                TransformInternalIndexConstants.INDEX_NAME_PATTERN,
-                TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED)
-            .setSize(0)
-            .setQuery(queryBuilder);
+        SearchRequestBuilder requestBuilder = client.prepareSearch(
+            TransformInternalIndexConstants.INDEX_NAME_PATTERN,
+            TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
+        ).setSize(0).setQuery(queryBuilder);
 
         final String path = TransformField.STATS_FIELD.getPreferredName() + ".";
-        for(String statName : PROVIDED_STATS) {
+        for (String statName : PROVIDED_STATS) {
             requestBuilder.addAggregation(AggregationBuilders.sum(statName).field(path + statName));
         }
 
-        ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(
-            searchResponse -> {
-                if (searchResponse.getShardFailures().length > 0) {
-                    logger.error("statistics summations search returned shard failures: {}",
-                        Arrays.toString(searchResponse.getShardFailures()));
-                }
-
-                statsListener.onResponse(parseSearchAggs(searchResponse));
-            },
-            failure -> {
-                if (failure instanceof ResourceNotFoundException) {
-                    statsListener.onResponse(new TransformIndexerStats());
-                } else {
-                    statsListener.onFailure(failure);
-                }
+        ActionListener<SearchResponse> getStatisticSummationsListener = ActionListener.wrap(searchResponse -> {
+            if (searchResponse.getShardFailures().length > 0) {
+                logger.error(
+                    "statistics summations search returned shard failures: {}",
+                    Arrays.toString(searchResponse.getShardFailures())
+                );
             }
-        );
-        ClientHelper.executeAsyncWithOrigin(client.threadPool().getThreadContext(),
+
+            statsListener.onResponse(parseSearchAggs(searchResponse));
+        }, failure -> {
+            if (failure instanceof ResourceNotFoundException) {
+                statsListener.onResponse(new TransformIndexerStats());
+            } else {
+                statsListener.onFailure(failure);
+            }
+        });
+        ClientHelper.executeAsyncWithOrigin(
+            client.threadPool().getThreadContext(),
             ClientHelper.TRANSFORM_ORIGIN,
             requestBuilder.request(),
             getStatisticSummationsListener,
-            client::search);
+            client::search
+        );
     }
 }

+ 2 - 3
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java

@@ -359,9 +359,8 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
             if (progress != null && progress.getPercentComplete() != null && progress.getPercentComplete() < 100.0) {
                 progress.incrementDocsProcessed(progress.getTotalDocs() - progress.getDocumentsProcessed());
             }
-            // If the last checkpoint is now greater than 1, that means that we have just processed the first
-            // continuous checkpoint and should start recording the exponential averages
-            if (lastCheckpoint != null && lastCheckpoint.getCheckpoint() > 1) {
+
+            if (lastCheckpoint != null) {
                 long docsIndexed = 0;
                 long docsProcessed = 0;
                 // This should not happen as we simply create a new one when we reach continuous checkpoints

+ 32 - 9
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/TransformInfoTransportActionTests.java

@@ -51,7 +51,11 @@ public class TransformInfoTransportActionTests extends ESTestCase {
 
     public void testAvailable() {
         TransformInfoTransportAction featureSet = new TransformInfoTransportAction(
-            mock(TransportService.class), mock(ActionFilters.class), Settings.EMPTY, licenseState);
+            mock(TransportService.class),
+            mock(ActionFilters.class),
+            Settings.EMPTY,
+            licenseState
+        );
         boolean available = randomBoolean();
         when(licenseState.isTransformAllowed()).thenReturn(available);
         assertThat(featureSet.available(), is(available));
@@ -62,13 +66,21 @@ public class TransformInfoTransportActionTests extends ESTestCase {
         Settings.Builder settings = Settings.builder();
         settings.put("xpack.transform.enabled", enabled);
         TransformInfoTransportAction featureSet = new TransformInfoTransportAction(
-            mock(TransportService.class), mock(ActionFilters.class), settings.build(), licenseState);
+            mock(TransportService.class),
+            mock(ActionFilters.class),
+            settings.build(),
+            licenseState
+        );
         assertThat(featureSet.enabled(), is(enabled));
     }
 
     public void testEnabledDefault() {
         TransformInfoTransportAction featureSet = new TransformInfoTransportAction(
-            mock(TransportService.class), mock(ActionFilters.class), Settings.EMPTY, licenseState);
+            mock(TransportService.class),
+            mock(ActionFilters.class),
+            Settings.EMPTY,
+            licenseState
+        );
         assertTrue(featureSet.enabled());
     }
 
@@ -77,8 +89,7 @@ public class TransformInfoTransportActionTests extends ESTestCase {
         SearchResponse withEmptyAggs = mock(SearchResponse.class);
         when(withEmptyAggs.getAggregations()).thenReturn(emptyAggs);
 
-        assertThat(TransformInfoTransportAction.parseSearchAggs(withEmptyAggs),
-            equalTo(new TransformIndexerStats()));
+        assertThat(TransformInfoTransportAction.parseSearchAggs(withEmptyAggs), equalTo(new TransformIndexerStats()));
 
         TransformIndexerStats expectedStats = new TransformIndexerStats(
             1,  // numPages
@@ -90,12 +101,16 @@ public class TransformInfoTransportActionTests extends ESTestCase {
             7,  // indexTotal
             8,  // searchTotal
             9,  // indexFailures
-            10); // searchFailures
+            10, // searchFailures
+            11.0,  // exponential_avg_checkpoint_duration_ms
+            12.0,  // exponential_avg_documents_indexed
+            13.0   // exponential_avg_documents_processed
+        );
 
         int currentStat = 1;
         List<Aggregation> aggs = new ArrayList<>(PROVIDED_STATS.length);
         for (String statName : PROVIDED_STATS) {
-            aggs.add(buildAgg(statName, (double) currentStat++));
+            aggs.add(buildAgg(statName, currentStat++));
         }
         Aggregations aggregations = new Aggregations(aggs);
         SearchResponse withAggs = mock(SearchResponse.class);
@@ -115,8 +130,16 @@ public class TransformInfoTransportActionTests extends ESTestCase {
         when(licenseState.isTransformAllowed()).thenReturn(true);
         Settings.Builder settings = Settings.builder();
         settings.put("xpack.transform.enabled", false);
-        var usageAction = new TransformUsageTransportAction(mock(TransportService.class), null, null,
-            mock(ActionFilters.class), null, settings.build(), licenseState, mock(Client.class));
+        var usageAction = new TransformUsageTransportAction(
+            mock(TransportService.class),
+            null,
+            null,
+            mock(ActionFilters.class),
+            null,
+            settings.build(),
+            licenseState,
+            mock(Client.class)
+        );
         PlainActionFuture<XPackUsageFeatureResponse> future = new PlainActionFuture<>();
         usageAction.masterOperation(null, null, mock(ClusterState.class), future);
         XPackFeatureSet.Usage usage = future.get().getUsage();