浏览代码

[Transform] fix page size return in cat transform, add dps (#57871)

fixes the page size reported after moving page size to settings(#56007) and
adds documents per second(throttling) to the output.

fixes #56498
Hendrik Muhs 5 年之前
父节点
当前提交
75ea6a4b17

+ 8 - 4
docs/reference/cat/transforms.asciidoc

@@ -20,9 +20,9 @@ Returns configuration and usage information about {transforms}.
 [[cat-transforms-api-prereqs]]
 [[cat-transforms-api-prereqs]]
 ==== {api-prereq-title}
 ==== {api-prereq-title}
 
 
-* If the {es} {security-features} are enabled, you must have `monitor_transform` 
-cluster privileges to use this API. The built-in `transform_user` role has these 
-privileges. For more information, see <<security-privileges>> and 
+* If the {es} {security-features} are enabled, you must have `monitor_transform`
+cluster privileges to use this API. The built-in `transform_user` role has these
+privileges. For more information, see <<security-privileges>> and
 <<built-in-roles>>.
 <<built-in-roles>>.
 
 
 //[[cat-transforms-api-desc]]
 //[[cat-transforms-api-desc]]
@@ -77,6 +77,10 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-index]
 `documents_indexed`, `doci`:::
 `documents_indexed`, `doci`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-indexed]
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-indexed]
 
 
+`docs_per_second`, `dps`:::
+(Default)
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-docs-per-second]
+
 `documents_processed`, `docp`:::
 `documents_processed`, `docp`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-processed]
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-processed]
 
 
@@ -139,7 +143,7 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=state-transform]
 
 
 `transform_type`, `tt`:::
 `transform_type`, `tt`:::
 (Default)
 (Default)
-Indicates the type of {transform}: `batch` or `continuous`. 
+Indicates the type of {transform}: `batch` or `continuous`.
 
 
 `trigger_count`, `tc`:::
 `trigger_count`, `tc`:::
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=trigger-count]
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=trigger-count]

+ 6 - 6
x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_cat_apis.yml

@@ -55,8 +55,8 @@ teardown:
         transform_id: "airline-transform-stats"
         transform_id: "airline-transform-stats"
   - match:
   - match:
       $body: |
       $body: |
-        /^  #id                           \s+ create_time \s+ version \s+ source_index \s+ dest_index              \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state    \n
-            (airline\-transform\-stats    \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data \s+ airline-data-by-airline \s+          \s+ batch          \s+ 1m        \s+ 500                  \s+ STOPPED \n)+  $/
+        /^  #id                           \s+ create_time \s+ version \s+ source_index \s+ dest_index              \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state    \n
+            (airline\-transform\-stats    \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data \s+ airline-data-by-airline \s+          \s+ batch          \s+ 1m        \s+ 500                  \s+ -               \s+ STOPPED \n)+  $/
 
 
 ---
 ---
 "Test cat transform stats with column selection":
 "Test cat transform stats with column selection":
@@ -95,8 +95,8 @@ teardown:
         v: true
         v: true
   - match:
   - match:
       $body: |
       $body: |
-        /^   id                        \s+ create_time \s+ version \s+ source_index                    \s+ dest_index                    \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state   \n
-            (airline\-transform\-batch \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+          \s+ description \s+ batch          \s+ 1m        \s+ 500                  \s+ STOPPED \n)+  $/
+        /^   id                        \s+ create_time \s+ version \s+ source_index                    \s+ dest_index                    \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state   \n
+            (airline\-transform\-batch \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+          \s+ description \s+ batch          \s+ 1m        \s+ 500                  \s+ -               \s+ STOPPED \n)+  $/
   - do:
   - do:
       transform.delete_transform:
       transform.delete_transform:
         transform_id: "airline-transform-batch"
         transform_id: "airline-transform-batch"
@@ -131,8 +131,8 @@ teardown:
         v: true
         v: true
   - match:
   - match:
       $body: |
       $body: |
-        /^   id                             \s+ create_time \s+ version \s+ source_index                    \s+ dest_index                         \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state   \n
-            (airline\-transform\-continuous \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+          \s+ description \s+ continuous     \s+ 10s       \s+ 500                  \s+ STOPPED \n)+  $/
+        /^   id                             \s+ create_time \s+ version \s+ source_index                    \s+ dest_index                         \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state   \n
+            (airline\-transform\-continuous \s+ [^\s]+      \s+ [^\s]+  \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+          \s+ description \s+ continuous     \s+ 10s       \s+ 500                  \s+ -               \s+ STOPPED \n)+  $/
   - do:
   - do:
       transform.delete_transform:
       transform.delete_transform:
         transform_id: "airline-transform-continuous"
         transform_id: "airline-transform-continuous"

+ 4 - 6
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Setting.Property;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.settings.SettingsFilter;
 import org.elasticsearch.common.settings.SettingsModule;
 import org.elasticsearch.common.settings.SettingsModule;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry.Entry;
 import org.elasticsearch.common.xcontent.NamedXContentRegistry.Entry;
 import org.elasticsearch.env.Environment;
 import org.elasticsearch.env.Environment;
@@ -128,6 +129,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
     private final SetOnce<TransformServices> transformServices = new SetOnce<>();
     private final SetOnce<TransformServices> transformServices = new SetOnce<>();
 
 
     public static final int DEFAULT_FAILURE_RETRIES = 10;
     public static final int DEFAULT_FAILURE_RETRIES = 10;
+    public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
+    public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
 
 
     // How many times the transform task can retry on an non-critical failure
     // How many times the transform task can retry on an non-critical failure
     public static final Setting<Integer> NUM_FAILURE_RETRIES_SETTING = Setting.intSetting(
     public static final Setting<Integer> NUM_FAILURE_RETRIES_SETTING = Setting.intSetting(
@@ -264,12 +267,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
     ) {
     ) {
         TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
         TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
         TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
         TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName());
-        TransformCheckpointService checkpointService = new TransformCheckpointService(
-            settings,
-            clusterService,
-            configManager,
-            auditor
-        );
+        TransformCheckpointService checkpointService = new TransformCheckpointService(settings, clusterService, configManager, auditor);
         SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
         SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
 
 
         transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));
         transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));

+ 98 - 121
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestCatTransformAction.java

@@ -10,20 +10,21 @@ import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Table;
 import org.elasticsearch.common.Table;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.xpack.core.action.util.PageParams;
-import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.action.RestActionListener;
 import org.elasticsearch.rest.action.RestActionListener;
 import org.elasticsearch.rest.action.RestResponseListener;
 import org.elasticsearch.rest.action.RestResponseListener;
 import org.elasticsearch.rest.action.cat.AbstractCatAction;
 import org.elasticsearch.rest.action.cat.AbstractCatAction;
 import org.elasticsearch.rest.action.cat.RestTable;
 import org.elasticsearch.rest.action.cat.RestTable;
+import org.elasticsearch.xpack.core.action.util.PageParams;
+import org.elasticsearch.xpack.core.common.table.TableColumnAttributeBuilder;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.TransformField;
 import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
 import org.elasticsearch.xpack.core.transform.action.GetTransformAction;
 import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
 import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction;
 import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
 import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
+import org.elasticsearch.xpack.transform.Transform;
 
 
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
@@ -35,14 +36,9 @@ import static org.elasticsearch.xpack.core.transform.TransformField.ALLOW_NO_MAT
 
 
 public class RestCatTransformAction extends AbstractCatAction {
 public class RestCatTransformAction extends AbstractCatAction {
 
 
-    private static final Integer DEFAULT_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
-    private static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
-
     @Override
     @Override
     public List<Route> routes() {
     public List<Route> routes() {
-        return List.of(
-            new Route(GET, "_cat/transforms"),
-            new Route(GET, "_cat/transforms/{" + TransformField.TRANSFORM_ID + "}"));
+        return List.of(new Route(GET, "_cat/transforms"), new Route(GET, "_cat/transforms/{" + TransformField.TRANSFORM_ID + "}"));
     }
     }
 
 
     @Override
     @Override
@@ -64,8 +60,10 @@ public class RestCatTransformAction extends AbstractCatAction {
         statsRequest.setAllowNoMatch(restRequest.paramAsBoolean(ALLOW_NO_MATCH.getPreferredName(), true));
         statsRequest.setAllowNoMatch(restRequest.paramAsBoolean(ALLOW_NO_MATCH.getPreferredName(), true));
 
 
         if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
         if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) {
-            PageParams pageParams = new PageParams(restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
-                                                    restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE));
+            PageParams pageParams = new PageParams(
+                restRequest.paramAsInt(PageParams.FROM.getPreferredName(), PageParams.DEFAULT_FROM),
+                restRequest.paramAsInt(PageParams.SIZE.getPreferredName(), PageParams.DEFAULT_SIZE)
+            );
             request.setPageParams(pageParams);
             request.setPageParams(pageParams);
             statsRequest.setPageParams(pageParams);
             statsRequest.setPageParams(pageParams);
         }
         }
@@ -95,138 +93,113 @@ public class RestCatTransformAction extends AbstractCatAction {
     }
     }
 
 
     private static Table getTableWithHeader() {
     private static Table getTableWithHeader() {
-        return new Table()
-            .startHeaders()
+        return new Table().startHeaders()
             // Transform config info
             // Transform config info
             .addCell("id", TableColumnAttributeBuilder.builder("the id").build())
             .addCell("id", TableColumnAttributeBuilder.builder("the id").build())
-            .addCell("create_time",
-                TableColumnAttributeBuilder.builder("transform creation time")
-                    .setAliases("ct", "createTime")
-                    .build())
-            .addCell("version",
-                TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created")
-                    .setAliases("v")
-                    .build())
-            .addCell("source_index",
-                TableColumnAttributeBuilder.builder("source index")
-                    .setAliases("si", "sourceIndex")
-                    .build())
-            .addCell("dest_index",
-                TableColumnAttributeBuilder.builder("destination index")
-                    .setAliases("di", "destIndex")
-                    .build())
-            .addCell("pipeline",
-                TableColumnAttributeBuilder.builder("transform pipeline")
-                    .setAliases("p")
-                    .build())
-            .addCell("description",
-                TableColumnAttributeBuilder.builder("description")
-                    .setAliases("d")
-                    .build())
-            .addCell("transform_type",
-                TableColumnAttributeBuilder.builder("batch or continuous transform")
-                    .setAliases("tt")
-                    .build())
-            .addCell("frequency",
-                TableColumnAttributeBuilder.builder("frequency of transform")
-                    .setAliases("f")
-                    .build())
-            .addCell("max_page_search_size",
-                TableColumnAttributeBuilder.builder("max page search size")
-                    .setAliases("mpsz")
-                    .build())
-
+            .addCell("create_time", TableColumnAttributeBuilder.builder("transform creation time").setAliases("ct", "createTime").build())
+            .addCell(
+                "version",
+                TableColumnAttributeBuilder.builder("the version of Elasticsearch when the transform was created").setAliases("v").build()
+            )
+            .addCell("source_index", TableColumnAttributeBuilder.builder("source index").setAliases("si", "sourceIndex").build())
+            .addCell("dest_index", TableColumnAttributeBuilder.builder("destination index").setAliases("di", "destIndex").build())
+            .addCell("pipeline", TableColumnAttributeBuilder.builder("transform pipeline").setAliases("p").build())
+            .addCell("description", TableColumnAttributeBuilder.builder("description").setAliases("d").build())
+            .addCell("transform_type", TableColumnAttributeBuilder.builder("batch or continuous transform").setAliases("tt").build())
+            .addCell("frequency", TableColumnAttributeBuilder.builder("frequency of transform").setAliases("f").build())
+            .addCell("max_page_search_size", TableColumnAttributeBuilder.builder("max page search size").setAliases("mpsz").build())
+            .addCell("docs_per_second", TableColumnAttributeBuilder.builder("docs per second").setAliases("dps").build())
             // Transform stats info
             // Transform stats info
-            .addCell("state",
+            .addCell(
+                "state",
                 TableColumnAttributeBuilder.builder("transform state")
                 TableColumnAttributeBuilder.builder("transform state")
                     .setAliases("s")
                     .setAliases("s")
                     .setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
                     .setTextAlignment(TableColumnAttributeBuilder.TextAlign.RIGHT)
-                    .build())
-            .addCell("reason",
-                TableColumnAttributeBuilder.builder("reason for the current state", false)
-                    .setAliases("r", "reason")
-                    .build())
-            .addCell("changes_last_detection_time",
-                TableColumnAttributeBuilder.builder("changes last detected time", false)
-                    .setAliases("cldt")
-                    .build())
-            .addCell("search_total",
-                TableColumnAttributeBuilder.builder("total number of search phases", false)
-                    .setAliases("st")
-                    .build())
-            .addCell("search_failure",
-                TableColumnAttributeBuilder.builder("total number of search failures", false)
-                    .setAliases("sf")
-                    .build())
-            .addCell("search_time",
-                TableColumnAttributeBuilder.builder("total search time", false)
-                    .setAliases("stime")
-                    .build())
-            .addCell("index_total",
-                TableColumnAttributeBuilder.builder("total number of index phases done by the transform", false)
-                    .setAliases("it")
-                    .build())
-            .addCell("index_failure",
-                TableColumnAttributeBuilder.builder("total number of index failures", false)
-                    .setAliases("if")
-                    .build())
-            .addCell("index_time",
-                TableColumnAttributeBuilder.builder("total time spent indexing documents", false)
-                    .setAliases("itime")
-                    .build())
-            .addCell("documents_processed",
-                TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed",
-                    false)
+                    .build()
+            )
+            .addCell("reason", TableColumnAttributeBuilder.builder("reason for the current state", false).setAliases("r", "reason").build())
+            .addCell(
+                "changes_last_detection_time",
+                TableColumnAttributeBuilder.builder("changes last detected time", false).setAliases("cldt").build()
+            )
+            .addCell("search_total", TableColumnAttributeBuilder.builder("total number of search phases", false).setAliases("st").build())
+            .addCell(
+                "search_failure",
+                TableColumnAttributeBuilder.builder("total number of search failures", false).setAliases("sf").build()
+            )
+            .addCell("search_time", TableColumnAttributeBuilder.builder("total search time", false).setAliases("stime").build())
+            .addCell(
+                "index_total",
+                TableColumnAttributeBuilder.builder("total number of index phases done by the transform", false).setAliases("it").build()
+            )
+            .addCell("index_failure", TableColumnAttributeBuilder.builder("total number of index failures", false).setAliases("if").build())
+            .addCell(
+                "index_time",
+                TableColumnAttributeBuilder.builder("total time spent indexing documents", false).setAliases("itime").build()
+            )
+            .addCell(
+                "documents_processed",
+                TableColumnAttributeBuilder.builder("the number of documents read from source indices and processed", false)
                     .setAliases("docp")
                     .setAliases("docp")
-                    .build())
-            .addCell("documents_indexed",
-                TableColumnAttributeBuilder.builder("the number of documents index to the destination index",
-                    false)
+                    .build()
+            )
+            .addCell(
+                "documents_indexed",
+                TableColumnAttributeBuilder.builder("the number of documents index to the destination index", false)
                     .setAliases("doci")
                     .setAliases("doci")
-                    .build())
-            .addCell("trigger_count",
-                TableColumnAttributeBuilder.builder("the number of times the transform has been triggered", false)
-                    .setAliases("tc")
-                    .build())
-            .addCell("pages_processed",
-                TableColumnAttributeBuilder.builder("the number of pages processed", false)
-                    .setAliases("pp")
-                    .build())
-            .addCell("processing_time",
-                TableColumnAttributeBuilder.builder("the total time spent processing documents", false)
-                    .setAliases("pt")
-                    .build())
-            .addCell("checkpoint_duration_time_exp_avg",
+                    .build()
+            )
+            .addCell(
+                "trigger_count",
+                TableColumnAttributeBuilder.builder("the number of times the transform has been triggered", false).setAliases("tc").build()
+            )
+            .addCell(
+                "pages_processed",
+                TableColumnAttributeBuilder.builder("the number of pages processed", false).setAliases("pp").build()
+            )
+            .addCell(
+                "processing_time",
+                TableColumnAttributeBuilder.builder("the total time spent processing documents", false).setAliases("pt").build()
+            )
+            .addCell(
+                "checkpoint_duration_time_exp_avg",
                 TableColumnAttributeBuilder.builder("exponential average checkpoint processing time (milliseconds)", false)
                 TableColumnAttributeBuilder.builder("exponential average checkpoint processing time (milliseconds)", false)
                     .setAliases("cdtea", "checkpointTimeExpAvg")
                     .setAliases("cdtea", "checkpointTimeExpAvg")
-                    .build())
-            .addCell("indexed_documents_exp_avg",
-                TableColumnAttributeBuilder.builder("exponential average number of documents indexed", false)
-                    .setAliases("idea")
-                    .build())
-            .addCell("processed_documents_exp_avg",
-                TableColumnAttributeBuilder.builder("exponential average number of documents processed", false)
-                    .setAliases("pdea")
-                    .build())
+                    .build()
+            )
+            .addCell(
+                "indexed_documents_exp_avg",
+                TableColumnAttributeBuilder.builder("exponential average number of documents indexed", false).setAliases("idea").build()
+            )
+            .addCell(
+                "processed_documents_exp_avg",
+                TableColumnAttributeBuilder.builder("exponential average number of documents processed", false).setAliases("pdea").build()
+            )
             .endHeaders();
             .endHeaders();
     }
     }
 
 
     private Table buildTable(GetTransformAction.Response response, GetTransformStatsAction.Response statsResponse) {
     private Table buildTable(GetTransformAction.Response response, GetTransformStatsAction.Response statsResponse) {
         Table table = getTableWithHeader();
         Table table = getTableWithHeader();
-        Map<String, TransformStats> statsById = statsResponse.getTransformsStats().stream()
-                                                                .collect(Collectors.toMap(TransformStats::getId, Function.identity()));
+        Map<String, TransformStats> statsById = statsResponse.getTransformsStats()
+            .stream()
+            .collect(Collectors.toMap(TransformStats::getId, Function.identity()));
         response.getTransformConfigurations().forEach(config -> {
         response.getTransformConfigurations().forEach(config -> {
             TransformStats stats = statsById.get(config.getId());
             TransformStats stats = statsById.get(config.getId());
             TransformCheckpointingInfo checkpointingInfo = null;
             TransformCheckpointingInfo checkpointingInfo = null;
             TransformIndexerStats transformIndexerStats = null;
             TransformIndexerStats transformIndexerStats = null;
 
 
-            if(stats != null) {
+            if (stats != null) {
                 checkpointingInfo = stats.getCheckpointingInfo();
                 checkpointingInfo = stats.getCheckpointingInfo();
                 transformIndexerStats = stats.getIndexerStats();
                 transformIndexerStats = stats.getIndexerStats();
             }
             }
 
 
-            table
-                .startRow()
+            Integer maxPageSearchSize = config.getSettings() == null || config.getSettings().getMaxPageSearchSize() == null
+                ? config.getPivotConfig() == null || config.getPivotConfig().getMaxPageSearchSize() == null
+                    ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE
+                    : config.getPivotConfig().getMaxPageSearchSize()
+                : config.getSettings().getMaxPageSearchSize();
+
+            table.startRow()
                 .addCell(config.getId())
                 .addCell(config.getId())
                 .addCell(config.getCreateTime())
                 .addCell(config.getCreateTime())
                 .addCell(config.getVersion())
                 .addCell(config.getVersion())
@@ -235,9 +208,13 @@ public class RestCatTransformAction extends AbstractCatAction {
                 .addCell(config.getDestination().getPipeline())
                 .addCell(config.getDestination().getPipeline())
                 .addCell(config.getDescription())
                 .addCell(config.getDescription())
                 .addCell(config.getSyncConfig() == null ? "batch" : "continuous")
                 .addCell(config.getSyncConfig() == null ? "batch" : "continuous")
-                .addCell(config.getFrequency() == null ? DEFAULT_TRANSFORM_FREQUENCY : config.getFrequency())
-                .addCell(config.getPivotConfig() == null || config.getPivotConfig().getMaxPageSearchSize() == null ?
-                            DEFAULT_MAX_PAGE_SEARCH_SIZE : config.getPivotConfig().getMaxPageSearchSize())
+                .addCell(config.getFrequency() == null ? Transform.DEFAULT_TRANSFORM_FREQUENCY : config.getFrequency())
+                .addCell(maxPageSearchSize)
+                .addCell(
+                    config.getSettings() == null || config.getSettings().getDocsPerSecond() == null
+                        ? "-"
+                        : config.getSettings().getDocsPerSecond()
+                )
                 .addCell(stats == null ? null : stats.getState())
                 .addCell(stats == null ? null : stats.getState())
                 .addCell(stats == null ? null : stats.getReason())
                 .addCell(stats == null ? null : stats.getReason())
                 .addCell(checkpointingInfo == null ? null : checkpointingInfo.getChangesLastDetectedAt())
                 .addCell(checkpointingInfo == null ? null : checkpointingInfo.getChangesLastDetectedAt())

+ 2 - 2
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

@@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
+import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
 import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 
 
@@ -49,7 +50,6 @@ import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_ST
 public class TransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener, TransformContext.Listener {
 public class TransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener, TransformContext.Listener {
 
 
     // Default interval the scheduler sends an event if the config does not specify a frequency
     // Default interval the scheduler sends an event if the config does not specify a frequency
-    private static final long SCHEDULER_NEXT_MILLISECONDS = 60000;
     private static final Logger logger = LogManager.getLogger(TransformTask.class);
     private static final Logger logger = LogManager.getLogger(TransformTask.class);
     private static final IndexerState[] RUNNING_STATES = new IndexerState[] { IndexerState.STARTED, IndexerState.INDEXING };
     private static final IndexerState[] RUNNING_STATES = new IndexerState[] { IndexerState.STARTED, IndexerState.INDEXING };
     public static final String SCHEDULE_NAME = TransformField.TASK_NAME + "/schedule";
     public static final String SCHEDULE_NAME = TransformField.TASK_NAME + "/schedule";
@@ -538,7 +538,7 @@ public class TransformTask extends AllocatedPersistentTask implements SchedulerE
     private SchedulerEngine.Schedule next() {
     private SchedulerEngine.Schedule next() {
         return (startTime, now) -> {
         return (startTime, now) -> {
             TimeValue frequency = transform.getFrequency();
             TimeValue frequency = transform.getFrequency();
-            return now + (frequency == null ? SCHEDULER_NEXT_MILLISECONDS : frequency.getMillis());
+            return now + (frequency == null ? Transform.DEFAULT_TRANSFORM_FREQUENCY.getMillis() : frequency.getMillis());
         };
         };
     }
     }
 
 

+ 7 - 9
x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java

@@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
+import org.elasticsearch.xpack.transform.Transform;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collection;
@@ -49,7 +50,6 @@ import java.util.stream.Stream;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 
 
 public class Pivot {
 public class Pivot {
-    public static final int DEFAULT_INITIAL_PAGE_SIZE = 500;
     public static final int TEST_QUERY_PAGE_SIZE = 50;
     public static final int TEST_QUERY_PAGE_SIZE = 50;
 
 
     private static final String COMPOSITE_AGGREGATION_NAME = "_transform";
     private static final String COMPOSITE_AGGREGATION_NAME = "_transform";
@@ -103,12 +103,10 @@ public class Pivot {
             listener.onResponse(true);
             listener.onResponse(true);
         }, e -> {
         }, e -> {
             Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
             Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
-            RestStatus status = unwrapped instanceof ElasticsearchException ?
-                ((ElasticsearchException)unwrapped).status() :
-                RestStatus.SERVICE_UNAVAILABLE;
-            listener.onFailure(new ElasticsearchStatusException("Failed to test query",
-                status,
-                unwrapped));
+            RestStatus status = unwrapped instanceof ElasticsearchException
+                ? ((ElasticsearchException) unwrapped).status()
+                : RestStatus.SERVICE_UNAVAILABLE;
+            listener.onFailure(new ElasticsearchStatusException("Failed to test query", status, unwrapped));
         }));
         }));
     }
     }
 
 
@@ -124,14 +122,14 @@ public class Pivot {
      * per page the page size is a multiplier for the costs of aggregating bucket.
      * per page the page size is a multiplier for the costs of aggregating bucket.
      *
      *
      * The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided,
      * The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided,
-     *    the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used.
+     *    the default {@link Transform#DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE} is used.
      *
      *
      * In future we might inspect the configuration and base the initial size on the aggregations used.
      * In future we might inspect the configuration and base the initial size on the aggregations used.
      *
      *
      * @return the page size
      * @return the page size
      */
      */
     public int getInitialPageSize() {
     public int getInitialPageSize() {
-        return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize();
+        return config.getMaxPageSearchSize() == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : config.getMaxPageSearchSize();
     }
     }
 
 
     public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {
     public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {

+ 2 - 2
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

@@ -36,11 +36,11 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
 import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
+import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
 import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
 import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
 import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
 import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
-import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 
 
@@ -240,7 +240,7 @@ public class TransformIndexerTests extends ESTestCase {
             new SettingsConfig(pageSize, null)
             new SettingsConfig(pageSize, null)
         );
         );
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-        final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
+        final long initialPageSize = pageSize == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : pageSize;
         Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
         Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
             throw new SearchPhaseExecutionException(
             throw new SearchPhaseExecutionException(
                 "query",
                 "query",

+ 2 - 1
x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
+import org.elasticsearch.xpack.transform.Transform;
 import org.elasticsearch.xpack.transform.transforms.pivot.Aggregations.AggregationType;
 import org.elasticsearch.xpack.transform.transforms.pivot.Aggregations.AggregationType;
 import org.junit.After;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
@@ -108,7 +109,7 @@ public class PivotTests extends ESTestCase {
         assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize));
         assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize));
 
 
         pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null));
         pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null));
-        assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE));
+        assertThat(pivot.getInitialPageSize(), equalTo(Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE));
 
 
         assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
         assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
     }
     }