浏览代码

[Transform] refactor cat transform to show more useful information (#68232)

Refactoring of cat transform to show more relevant information. The current cat transform shows a
lot of configuration details, however cat should show operationally useful information. This PR
changes the defaults and also adds when transform did a search last.
Hendrik Muhs 4 年之前
父节点
当前提交
cf08c0e6ab

+ 19 - 19
docs/reference/cat/transforms.asciidoc

@@ -57,35 +57,40 @@ specified columns.
 Valid columns are:
 
 `changes_last_detection_time`, `cldt`:::
+(Default)
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=checkpointing-changes-last-detected-at]
 
+`checkpoint`, `cp`:::
+(Default)
+The sequence number for the checkpoint.
+
 `checkpoint_duration_time_exp_avg`, `cdtea`, `checkpointTimeExpAvg`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=exponential-avg-checkpoint-duration-ms]
 
-`create_time`, `ct`, `createTime`:::
+`checkpoint_progress`, `c`, `checkpointProgress`:::
 (Default)
+The progress of the next checkpoint that is currently in progress.
+
+`create_time`, `ct`, `createTime`:::
 The time the {transform} was created.
 
 `description`, `d`:::
-(Default)
 The description of the {transform}.
 
 `dest_index`, `di`, `destIndex`:::
-(Default)
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-index]
 
 `documents_indexed`, `doci`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-indexed]
 
 `docs_per_second`, `dps`:::
-(Default)
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-docs-per-second]
 
 `documents_processed`, `docp`:::
+(Default)
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-processed]
 
 `frequency`, `f`:::
-(Default)
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=frequency]
 
 `id`:::
@@ -104,15 +109,17 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index-total]
 `indexed_documents_exp_avg`, `idea`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=exponential-avg-documents-indexed]
 
-`max_page_search_size`, `mpsz`:::
+`last_search_time`, `lst`, `lastSearchTime`:::
 (Default)
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=checkpointing-last-search-time]
+
+`max_page_search_size`, `mpsz`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
 
 `pages_processed`, `pp`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pages-processed]
 
 `pipeline`, `p`:::
-(Default)
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-pipeline]
 
 `processed_documents_exp_avg`, `pdea`:::
@@ -142,14 +149,12 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=source-index-transform
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=state-transform]
 
 `transform_type`, `tt`:::
-(Default)
 Indicates the type of {transform}: `batch` or `continuous`.
 
 `trigger_count`, `tc`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=trigger-count]
 
 `version`, `v`:::
-(Default)
 The version of {es} that existed on the node when the {transform} was
 created.
 
@@ -179,16 +184,11 @@ GET /_cat/transforms?v=true&format=json
 [
   {
     "id" : "ecommerce_transform",
-    "create_time" : "2020-03-20T20:31:25.077Z",
-    "version" : "7.7.0",
-    "source_index" : "kibana_sample_data_ecommerce",
-    "dest_index" : "kibana_sample_data_ecommerce_transform",
-    "pipeline" : null,
-    "description" : "Maximum priced ecommerce data by customer_id in Asia",
-    "transform_type" : "continuous",
-    "frequency" : "5m",
-    "max_page_search_size" : "500",
-    "state" : "STARTED"
+    "state" : "started",
+    "checkpoint" : "1",
+    "documents_processed" : "705",
+    "checkpoint_progress" : "100.00",
+    "changes_last_detection_time" : null
   }
 ]
 ----

+ 5 - 0
docs/reference/rest-api/common-parms.asciidoc

@@ -102,6 +102,11 @@ tag::checkpointing-changes-last-detected-at[]
 The timestamp when changes were last detected in the source indices.
 end::checkpointing-changes-last-detected-at[]
 
+tag::checkpointing-last-search-time[]
+The timestamp of the last search in the source indices. This field is only
+shown if the transform is running.
+end::checkpointing-last-search-time[]
+
 tag::cluster-health-status[]
 (string)
 Health status of the cluster, based on the state of its primary and replica

+ 4 - 0
docs/reference/transform/apis/get-transform-stats.asciidoc

@@ -108,6 +108,10 @@ was created.
 =====
 //End checkpointing.last
 
+`last_search_time`:::
+(date)
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=checkpointing-last-search-time]
+
 //Begin checkpointing.next
 `next`:::
 (object) Contains statistics about the next checkpoint that is currently in

+ 5 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformStats.java

@@ -289,6 +289,11 @@ public class TransformStats implements Writeable, ToXContentObject {
             out.writeEnum(this);
         }
 
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+
         public String value() {
             return name().toLowerCase(Locale.ROOT);
         }

+ 83 - 11
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_cat_apis.yml

@@ -54,6 +54,57 @@ setup:
             }
           }
 
+  - do:
+      index:
+        index: airline-data
+        id: 1
+        body: >
+          {
+            "time": "2017-02-18T00:00:00Z",
+            "airline": "airline1",
+            "responsetime": 1.0,
+            "event_rate": 5
+          }
+
+  - do:
+      index:
+        index: airline-data
+        id: 2
+        body: >
+          {
+            "time": "2017-02-18T00:30:00Z",
+            "airline": "airline1",
+            "responsetime": 1.0,
+            "event_rate": 6
+          }
+
+  - do:
+      index:
+        index: airline-data
+        id: 3
+        body: >
+          {
+            "time": "2017-02-18T01:00:00Z",
+            "airline": "airline2",
+            "responsetime": 42.0,
+            "event_rate": 8
+          }
+
+  - do:
+      index:
+        index: airline-data
+        id: 4
+        body: >
+          {
+            "time": "2017-02-18T01:01:00Z",
+            "airline": "airline1",
+            "responsetime": 42.0,
+            "event_rate": 7
+          }
+
+  - do:
+      indices.refresh:
+        index: airline-data
 ---
 teardown:
   - do:
@@ -70,9 +121,9 @@ teardown:
         transform_id: "airline-transform-*"
   - match:
       $body: |
-        /^  #id                           \s+ create_time \s+ version \s+ source_index \s+ dest_index              \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state    \n
-            (airline\-transform\-latest   \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data \s+ airline-data-latest     \s+          \s+ batch          \s+ 1m        \s+ 500                  \s+ -               \s+ STOPPED \n)+
-            (airline\-transform\-stats    \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data \s+ airline-data-by-airline \s+          \s+ batch          \s+ 1m        \s+ 500                  \s+ -               \s+ STOPPED \n)+  $/
+        /^  #id                           \s+ state   \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n
+            (airline\-transform\-latest   \s+ stopped \s+ 0          \s+ 0                   \s+                     \s+                  \s+                             \n)+
+            (airline\-transform\-stats    \s+ stopped \s+ 0          \s+ 0                   \s+                     \s+                  \s+                             \n)+  $/
 
 ---
 "Test cat transform stats with column selection":
@@ -84,9 +135,8 @@ teardown:
   - match:
       $body: |
         /^   id                         \s+ version \s+ source_index \s+ dest_index              \s+ search_total \s+ index_total \s+ docp \s+ cdtea  \s+ indexed_documents_exp_avg \n
-            (airline\-transform-latest  \s+ [^\s]+  \s+ airline-data \s+ airline-data-latest     \s+ 0            \s+ 0           \s+ 0  \s+ 0.0    \s+ 0.0 \n)+
-            (airline\-transform-stats   \s+ [^\s]+  \s+ airline-data \s+ airline-data-by-airline \s+ 0            \s+ 0           \s+ 0  \s+ 0.0    \s+ 0.0 \n)+  $/
-
+            (airline\-transform-latest  \s+ [^\s]+  \s+ airline-data \s+ airline-data-latest     \s+ 0            \s+ 0           \s+ 0    \s+ 0.00   \s+ 0.00                      \n)+
+            (airline\-transform-stats   \s+ [^\s]+  \s+ airline-data \s+ airline-data-by-airline \s+ 0            \s+ 0           \s+ 0    \s+ 0.00   \s+ 0.00                      \n)+  $/
 
 ---
 "Test cat transform stats with batch transform":
@@ -97,7 +147,7 @@ teardown:
           {
             "source": {
               "index": ["airline-data", "airline-data-other"],
-              "query": {"bool":{"filter":{"term":{"airline":"foo"}}}}
+              "query": {"bool":{"filter":{"term":{"airline":"airline1"}}}}
             },
             "dest": { "index": "airline-data-by-airline-batch" },
             "pivot": {
@@ -112,8 +162,30 @@ teardown:
         v: true
   - match:
       $body: |
-        /^   id                        \s+ create_time \s+ version \s+ source_index                    \s+ dest_index                    \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state   \n
-            (airline\-transform\-batch \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+          \s+ description \s+ batch          \s+ 1m        \s+ 500                  \s+ -               \s+ STOPPED \n)+  $/
+        /^   id                         \s+ state   \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n
+            (airline\-transform\-batch  \s+ stopped \s+ 0          \s+ 0                   \s+                     \s+                  \s+                             \n)+  $/
+  - do:
+      transform.start_transform:
+        transform_id: "airline-transform-batch"
+  - match: { acknowledged: true }
+
+  - do:
+      transform.stop_transform:
+        wait_for_checkpoint: true
+        transform_id: "airline-transform-batch"
+        wait_for_completion: true
+  - match: { acknowledged: true }
+
+  - do:
+      cat.transforms:
+        transform_id: "airline-transform-batch"
+        v: true
+
+  # see gh#62204 despite wait_for_completion is true, it might still not be stopped
+  - match:
+      $body: |
+        /^   id                        \s+ state    \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n
+            (airline\-transform\-batch  \s+ stop.*  \s+ 1          \s+ 3                   \s+ 100.00              \s+                  \s+ .*                          \n)+  $/
   - do:
       transform.delete_transform:
         transform_id: "airline-transform-batch"
@@ -148,8 +220,8 @@ teardown:
         v: true
   - match:
       $body: |
-        /^   id                             \s+ create_time \s+ version \s+ source_index                    \s+ dest_index                         \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state   \n
-            (airline\-transform\-continuous \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+          \s+ description \s+ continuous     \s+ 10s       \s+ 500                  \s+ -               \s+ STOPPED \n)+  $/
+        /^   id                             \s+ state   \s+ checkpoint \s+ documents_processed \s+ checkpoint_progress \s+ last_search_time \s+ changes_last_detection_time \n
+            (airline\-transform\-continuous \s+ stopped \s+ 0          \s+ 0                   \s+                     \s+                  \s+                             \n)+  $/
   - do:
       transform.delete_transform:
         transform_id: "airline-transform-continuous"

+ 80 - 31
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestCatTransformAction.java

@@ -26,7 +26,9 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
 import org.elasticsearch.xpack.transform.Transform;
 
+import java.util.Date;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -94,22 +96,8 @@ public class RestCatTransformAction extends AbstractCatAction {
 
     private static Table getTableWithHeader() {
         return new Table().startHeaders()
-            // Transform config info
+            // default columns
             .addCell("id", TableColumnAttributeBuilder.builder("the id").build())
-            .addCell("create_time", TableColumnAttributeBuilder.builder("transform creation time").setAliases("ct", "createTime").build())
-            .addCell(
-                "version",
-                TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created").setAliases("v").build()
-            )
-            .addCell("source_index", TableColumnAttributeBuilder.builder("source index").setAliases("si", "sourceIndex").build())
-            .addCell("dest_index", TableColumnAttributeBuilder.builder("destination index").setAliases("di", "destIndex").build())
-            .addCell("pipeline", TableColumnAttributeBuilder.builder("transform pipeline").setAliases("p").build())
-            .addCell("description", TableColumnAttributeBuilder.builder("description").setAliases("d").build())
-            .addCell("transform_type", TableColumnAttributeBuilder.builder("batch or continuous transform").setAliases("tt").build())
-            .addCell("frequency", TableColumnAttributeBuilder.builder("frequency of transform").setAliases("f").build())
-            .addCell("max_page_search_size", TableColumnAttributeBuilder.builder("max page search size").setAliases("mpsz").build())
-            .addCell("docs_per_second", TableColumnAttributeBuilder.builder("docs per second").setAliases("dps").build())
-            // Transform stats info
             .addCell(
                 "state",
                 TableColumnAttributeBuilder.builder("transform state")
@@ -117,11 +105,48 @@ public class RestCatTransformAction extends AbstractCatAction {
                     .setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
                     .build()
             )
-            .addCell("reason", TableColumnAttributeBuilder.builder("reason for the current state", false).setAliases("r", "reason").build())
+            .addCell("checkpoint", TableColumnAttributeBuilder.builder("checkpoint").setAliases("c").build())
+            .addCell(
+                "documents_processed",
+                TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed")
+                    .setAliases("docp", "documentsProcessed")
+                    .build()
+            )
+            .addCell(
+                "checkpoint_progress",
+                TableColumnAttributeBuilder.builder("progress of the checkpoint").setAliases("cp", "checkpointProgress").build()
+            )
+            .addCell(
+                "last_search_time",
+                TableColumnAttributeBuilder.builder("last time transform searched for updates").setAliases("lst", "lastSearchTime").build()
+            )
             .addCell(
                 "changes_last_detection_time",
-                TableColumnAttributeBuilder.builder("changes last detected time", false).setAliases("cldt").build()
+                TableColumnAttributeBuilder.builder("changes last detected time").setAliases("cldt").build()
+            )
+
+            // optional columns
+            .addCell(
+                "create_time",
+                TableColumnAttributeBuilder.builder("transform creation time", false).setAliases("ct", "createTime").build()
+            )
+            .addCell(
+                "version",
+                TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created", false)
+                    .setAliases("v")
+                    .build()
             )
+            .addCell("source_index", TableColumnAttributeBuilder.builder("source index", false).setAliases("si", "sourceIndex").build())
+            .addCell("dest_index", TableColumnAttributeBuilder.builder("destination index", false).setAliases("di", "destIndex").build())
+            .addCell("pipeline", TableColumnAttributeBuilder.builder("transform pipeline", false).setAliases("p").build())
+            .addCell("description", TableColumnAttributeBuilder.builder("description", false).setAliases("d").build())
+            .addCell("transform_type", TableColumnAttributeBuilder.builder("batch or continuous transform", false).setAliases("tt").build())
+            .addCell("frequency", TableColumnAttributeBuilder.builder("frequency of transform", false).setAliases("f").build())
+            .addCell("max_page_search_size", TableColumnAttributeBuilder.builder("max page search size", false).setAliases("mpsz").build())
+            .addCell("docs_per_second", TableColumnAttributeBuilder.builder("docs per second", false).setAliases("dps").build())
+
+            .addCell("reason", TableColumnAttributeBuilder.builder("reason for the current state", false).setAliases("r", "reason").build())
+
             .addCell("search_total", TableColumnAttributeBuilder.builder("total number of search phases", false).setAliases("st").build())
             .addCell(
                 "search_failure",
@@ -137,15 +162,9 @@ public class RestCatTransformAction extends AbstractCatAction {
                 "index_time",
                 TableColumnAttributeBuilder.builder("total time spent indexing documents", false).setAliases("itime").build()
             )
-            .addCell(
-                "documents_processed",
-                TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed", false)
-                    .setAliases("docp")
-                    .build()
-            )
             .addCell(
                 "documents_indexed",
-                TableColumnAttributeBuilder.builder("the number of documents index to the destination index", false)
+                TableColumnAttributeBuilder.builder("the number of documents written to the destination index", false)
                     .setAliases("doci")
                     .build()
             )
@@ -199,9 +218,30 @@ public class RestCatTransformAction extends AbstractCatAction {
                     : config.getPivotConfig().getMaxPageSearchSize()
                 : config.getSettings().getMaxPageSearchSize();
 
+            Double progress = checkpointingInfo == null ? null
+                : checkpointingInfo.getNext().getCheckpointProgress() == null ? null
+                : checkpointingInfo.getNext().getCheckpointProgress().getPercentComplete();
+
             table.startRow()
+                // default columns
                 .addCell(config.getId())
-                .addCell(config.getCreateTime())
+                .addCell(stats == null ? null : stats.getState().toString())
+                .addCell(checkpointingInfo == null ? null : checkpointingInfo.getLast().getCheckpoint())
+                .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumDocuments())
+                .addCell(progress == null ? null : String.format(Locale.ROOT, "%.2f", progress))
+                .addCell(
+                    checkpointingInfo == null ? null
+                        : checkpointingInfo.getLastSearchTime() == null ? null
+                        : Date.from(checkpointingInfo.getLastSearchTime())
+                )
+                .addCell(
+                    checkpointingInfo == null ? null
+                        : checkpointingInfo.getChangesLastDetectedAt() == null ? null
+                        : Date.from(checkpointingInfo.getChangesLastDetectedAt())
+                )
+
+                // optional columns
+                .addCell(config.getCreateTime() == null ? null : Date.from(config.getCreateTime()))
                 .addCell(config.getVersion())
                 .addCell(String.join(",", config.getSource().getIndex()))
                 .addCell(config.getDestination().getIndex())
@@ -215,9 +255,7 @@ public class RestCatTransformAction extends AbstractCatAction {
                         ? "-"
                         : config.getSettings().getDocsPerSecond()
                 )
-                .addCell(stats == null ? null : stats.getState())
                 .addCell(stats == null ? null : stats.getReason())
-                .addCell(checkpointingInfo == null ? null : checkpointingInfo.getChangesLastDetectedAt())
                 .addCell(transformIndexerStats == null ? null : transformIndexerStats.getSearchTotal())
                 .addCell(transformIndexerStats == null ? null : transformIndexerStats.getSearchFailures())
                 .addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getSearchTime()))
@@ -226,15 +264,26 @@ public class RestCatTransformAction extends AbstractCatAction {
                 .addCell(transformIndexerStats == null ? null : transformIndexerStats.getIndexFailures())
                 .addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getIndexTime()))
 
-                .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumDocuments())
                 .addCell(transformIndexerStats == null ? null : transformIndexerStats.getOutputDocuments())
                 .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumInvocations())
                 .addCell(transformIndexerStats == null ? null : transformIndexerStats.getNumPages())
                 .addCell(transformIndexerStats == null ? null : TimeValue.timeValueMillis(transformIndexerStats.getProcessingTime()))
 
-                .addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgCheckpointDurationMs())
-                .addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgDocumentsIndexed())
-                .addCell(transformIndexerStats == null ? null : transformIndexerStats.getExpAvgDocumentsProcessed())
+                .addCell(
+                    transformIndexerStats == null
+                        ? null
+                        : String.format(Locale.ROOT, "%.2f", transformIndexerStats.getExpAvgCheckpointDurationMs())
+                )
+                .addCell(
+                    transformIndexerStats == null
+                        ? null
+                        : String.format(Locale.ROOT, "%.2f", transformIndexerStats.getExpAvgDocumentsIndexed())
+                )
+                .addCell(
+                    transformIndexerStats == null
+                        ? null
+                        : String.format(Locale.ROOT, "%.2f", transformIndexerStats.getExpAvgDocumentsProcessed())
+                )
                 .endRow();
         });