Browse Source

[ML] adding pivot.max_search_page_size option for setting paging size (#41920)

* [ML] adding pivot.size option for setting paging size

* Changing field name to address PR comments

* fixing ctor usage

* adjust hlrc for field name change
Benjamin Trent 6 years ago
parent
commit
da4899f786
14 changed files with 154 additions and 32 deletions
  1. 33 5
      client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java
  2. 3 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java
  3. 3 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java
  4. 5 0
      docs/java-rest/high-level/dataframe/put_data_frame.asciidoc
  5. 1 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java
  6. 8 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java
  7. 20 4
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java
  8. 6 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java
  9. 7 1
      x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java
  10. 2 2
      x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java
  11. 5 3
      x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java
  12. 19 10
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java
  13. 12 2
      x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java
  14. 30 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

+ 33 - 5
client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfig.java

@@ -39,25 +39,29 @@ public class PivotConfig implements ToXContentObject {
 
     private static final ParseField GROUP_BY = new ParseField("group_by");
     private static final ParseField AGGREGATIONS = new ParseField("aggregations");
+    private static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
 
     private final GroupConfig groups;
     private final AggregationConfig aggregationConfig;
+    private final Integer maxPageSearchSize;
 
     private static final ConstructingObjectParser<PivotConfig, Void> PARSER = new ConstructingObjectParser<>("pivot_config", true,
-                args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1]));
+                args -> new PivotConfig((GroupConfig) args[0], (AggregationConfig) args[1], (Integer) args[2]));
 
     static {
         PARSER.declareObject(constructorArg(), (p, c) -> (GroupConfig.fromXContent(p)), GROUP_BY);
         PARSER.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p), AGGREGATIONS);
+        PARSER.declareInt(optionalConstructorArg(), MAX_PAGE_SEARCH_SIZE);
     }
 
     public static PivotConfig fromXContent(final XContentParser parser) {
         return PARSER.apply(parser, null);
     }
 
-    PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig) {
+    PivotConfig(GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
         this.groups = groups;
         this.aggregationConfig = aggregationConfig;
+        this.maxPageSearchSize = maxPageSearchSize;
     }
 
     @Override
@@ -65,6 +69,9 @@ public class PivotConfig implements ToXContentObject {
         builder.startObject();
         builder.field(GROUP_BY.getPreferredName(), groups);
         builder.field(AGGREGATIONS.getPreferredName(), aggregationConfig);
+        if (maxPageSearchSize != null) {
+            builder.field(MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
+        }
         builder.endObject();
         return builder;
     }
@@ -77,6 +84,10 @@ public class PivotConfig implements ToXContentObject {
         return groups;
     }
 
+    public Integer getMaxPageSearchSize() {
+        return maxPageSearchSize;
+    }
+
     @Override
     public boolean equals(Object other) {
         if (this == other) {
@@ -89,12 +100,14 @@ public class PivotConfig implements ToXContentObject {
 
         final PivotConfig that = (PivotConfig) other;
 
-        return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
+        return Objects.equals(this.groups, that.groups)
+            && Objects.equals(this.aggregationConfig, that.aggregationConfig)
+            && Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(groups, aggregationConfig);
+        return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
     }
 
     public static Builder builder() {
@@ -104,6 +117,7 @@ public class PivotConfig implements ToXContentObject {
     public static class Builder {
         private GroupConfig groups;
         private AggregationConfig aggregationConfig;
+        private Integer maxPageSearchSize;
 
         /**
          * Set how to group the source data
@@ -135,8 +149,22 @@ public class PivotConfig implements ToXContentObject {
             return this;
         }
 
+        /**
+         * Sets the paging maximum paging maxPageSearchSize that date frame transform can use when
+         * pulling the data from the source index.
+         *
+         * If OOM is triggered, the paging maxPageSearchSize is dynamically reduced so that the transform can continue to gather data.
+         *
+         * @param maxPageSearchSize Integer value between 10 and 10_000
+         * @return the {@link Builder} with the paging maxPageSearchSize set.
+         */
+        public Builder setMaxPageSearchSize(Integer maxPageSearchSize) {
+            this.maxPageSearchSize = maxPageSearchSize;
+            return this;
+        }
+
         public PivotConfig build() {
-            return new PivotConfig(groups, aggregationConfig);
+            return new PivotConfig(groups, aggregationConfig, maxPageSearchSize);
         }
     }
 }

+ 3 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/pivot/PivotConfigTests.java

@@ -32,7 +32,9 @@ import java.util.function.Predicate;
 public class PivotConfigTests extends AbstractXContentTestCase<PivotConfig> {
 
     public static PivotConfig randomPivotConfig() {
-        return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
+        return new PivotConfig(GroupConfigTests.randomGroupConfig(),
+            AggregationConfigTests.randomAggregationConfig(),
+            randomBoolean() ? null : randomIntBetween(10, 10_000));
     }
 
     @Override

+ 3 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java

@@ -138,8 +138,9 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
         // end::put-data-frame-transform-agg-config
         // tag::put-data-frame-transform-pivot-config
         PivotConfig pivotConfig = PivotConfig.builder()
-            .setGroups(groupConfig)
-            .setAggregationConfig(aggConfig)
+            .setGroups(groupConfig) // <1>
+            .setAggregationConfig(aggConfig) // <2>
+            .setMaxPageSearchSize(1000) // <3>
             .build();
         // end::put-data-frame-transform-pivot-config
         // tag::put-data-frame-transform-config

+ 5 - 0
docs/java-rest/high-level/dataframe/put_data_frame.asciidoc

@@ -66,6 +66,11 @@ Defines the pivot function `group by` fields and the aggregation to reduce the d
 --------------------------------------------------
 include-tagged::{doc-tests-file}[{api}-pivot-config]
 --------------------------------------------------
+<1> The `GroupConfig` to use in the pivot
+<2> The aggregations to use
+<3> The maximum paging size for the transform when pulling data
+from the source. The size dynamically adjusts as the transform
+is running to recover from and prevent OOM issues.
 
 ===== GroupConfig
 The grouping terms. Defines the group by and destination fields

+ 1 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java

@@ -27,6 +27,7 @@ public final class DataFrameField {
     public static final ParseField SOURCE = new ParseField("source");
     public static final ParseField DESTINATION = new ParseField("dest");
     public static final ParseField FORCE = new ParseField("force");
+    public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
 
     /**
      * Fields for checkpointing

+ 8 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java

@@ -56,6 +56,14 @@ public class PutDataFrameTransformAction extends Action<AcknowledgedResponse> {
         @Override
         public ActionRequestValidationException validate() {
             ActionRequestValidationException validationException = null;
+            if(config.getPivotConfig() != null
+                && config.getPivotConfig().getMaxPageSearchSize() != null
+                && (config.getPivotConfig().getMaxPageSearchSize() < 10 || config.getPivotConfig().getMaxPageSearchSize() > 10_000)) {
+                validationException = addValidationError(
+                    "pivot.max_page_search_size [" +
+                        config.getPivotConfig().getMaxPageSearchSize() + "] must be greater than 10 and less than 10,000",
+                    validationException);
+            }
             for(String failure : config.getPivotConfig().aggFieldValidation()) {
                 validationException = addValidationError(failure, validationException);
             }

+ 20 - 4
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfig.java

@@ -6,6 +6,7 @@
 
 package org.elasticsearch.xpack.core.dataframe.transforms.pivot;
 
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.Writeable;
@@ -35,6 +36,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
     private static final String NAME = "data_frame_transform_pivot";
     private final GroupConfig groups;
     private final AggregationConfig aggregationConfig;
+    private final Integer maxPageSearchSize;
 
     private static final ConstructingObjectParser<PivotConfig, Void> STRICT_PARSER = createParser(false);
     private static final ConstructingObjectParser<PivotConfig, Void> LENIENT_PARSER = createParser(true);
@@ -61,7 +63,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
                         throw new IllegalArgumentException("Required [aggregations]");
                     }
 
-                    return new PivotConfig(groups, aggregationConfig);
+                    return new PivotConfig(groups, aggregationConfig, (Integer)args[3]);
                 });
 
         parser.declareObject(constructorArg(),
@@ -69,18 +71,21 @@ public class PivotConfig implements Writeable, ToXContentObject {
 
         parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGREGATIONS);
         parser.declareObject(optionalConstructorArg(), (p, c) -> AggregationConfig.fromXContent(p, lenient), DataFrameField.AGGS);
+        parser.declareInt(optionalConstructorArg(), DataFrameField.MAX_PAGE_SEARCH_SIZE);
 
         return parser;
     }
 
-    public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig) {
+    public PivotConfig(final GroupConfig groups, final AggregationConfig aggregationConfig, Integer maxPageSearchSize) {
         this.groups = ExceptionsHelper.requireNonNull(groups, DataFrameField.GROUP_BY.getPreferredName());
         this.aggregationConfig = ExceptionsHelper.requireNonNull(aggregationConfig, DataFrameField.AGGREGATIONS.getPreferredName());
+        this.maxPageSearchSize = maxPageSearchSize;
     }
 
     public PivotConfig(StreamInput in) throws IOException {
         this.groups = new GroupConfig(in);
         this.aggregationConfig = new AggregationConfig(in);
+        this.maxPageSearchSize = in.readOptionalInt();
     }
 
     @Override
@@ -88,6 +93,9 @@ public class PivotConfig implements Writeable, ToXContentObject {
         builder.startObject();
         builder.field(DataFrameField.GROUP_BY.getPreferredName(), groups);
         builder.field(DataFrameField.AGGREGATIONS.getPreferredName(), aggregationConfig);
+        if (maxPageSearchSize != null) {
+            builder.field(DataFrameField.MAX_PAGE_SEARCH_SIZE.getPreferredName(), maxPageSearchSize);
+        }
         builder.endObject();
         return builder;
     }
@@ -113,6 +121,7 @@ public class PivotConfig implements Writeable, ToXContentObject {
     public void writeTo(StreamOutput out) throws IOException {
         groups.writeTo(out);
         aggregationConfig.writeTo(out);
+        out.writeOptionalInt(maxPageSearchSize);
     }
 
     public AggregationConfig getAggregationConfig() {
@@ -123,6 +132,11 @@ public class PivotConfig implements Writeable, ToXContentObject {
         return groups;
     }
 
+    @Nullable
+    public Integer getMaxPageSearchSize() {
+        return maxPageSearchSize;
+    }
+
     @Override
     public boolean equals(Object other) {
         if (this == other) {
@@ -135,12 +149,14 @@ public class PivotConfig implements Writeable, ToXContentObject {
 
         final PivotConfig that = (PivotConfig) other;
 
-        return Objects.equals(this.groups, that.groups) && Objects.equals(this.aggregationConfig, that.aggregationConfig);
+        return Objects.equals(this.groups, that.groups)
+            && Objects.equals(this.aggregationConfig, that.aggregationConfig)
+            && Objects.equals(this.maxPageSearchSize, that.maxPageSearchSize);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(groups, aggregationConfig);
+        return Objects.hash(groups, aggregationConfig, maxPageSearchSize);
     }
 
     public boolean isValid() {

+ 6 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/pivot/PivotConfigTests.java

@@ -24,11 +24,15 @@ import static org.hamcrest.Matchers.empty;
 public class PivotConfigTests extends AbstractSerializingDataFrameTestCase<PivotConfig> {
 
     public static PivotConfig randomPivotConfig() {
-        return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig());
+        return new PivotConfig(GroupConfigTests.randomGroupConfig(),
+            AggregationConfigTests.randomAggregationConfig(),
+            randomBoolean() ? null : randomIntBetween(10, 10_000));
     }
 
     public static PivotConfig randomInvalidPivotConfig() {
-        return new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomInvalidAggregationConfig());
+        return new PivotConfig(GroupConfigTests.randomGroupConfig(),
+            AggregationConfigTests.randomInvalidAggregationConfig(),
+            randomBoolean() ? null : randomIntBetween(10, 10_000));
     }
 
     @Override

+ 7 - 1
x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java

@@ -172,7 +172,13 @@ abstract class DataFrameIntegTestCase extends ESIntegTestCase {
 
     protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
                                             AggregatorFactories.Builder aggregations) throws Exception {
-        return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations));
+        return createPivotConfig(groups, aggregations, null);
+    }
+
+    protected PivotConfig createPivotConfig(Map<String, SingleGroupSource> groups,
+                                            AggregatorFactories.Builder aggregations,
+                                            Integer size) throws Exception {
+        return new PivotConfig(createGroupConfig(groups), createAggConfig(aggregations), size);
     }
 
     protected DataFrameTransformConfig createTransformConfig(String id,

+ 2 - 2
x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java

@@ -130,7 +130,7 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
         AggregatorFactories.Builder aggs = new AggregatorFactories.Builder();
         aggs.addAggregator(AggregationBuilders.avg("avg_rating").field("stars"));
         AggregationConfig aggregationConfig = new AggregationConfig(Collections.emptyMap(), aggs);
-        PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
+        PivotConfig pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
         DataFrameTransformConfig config = new DataFrameTransformConfig("get_progress_transform",
             sourceConfig,
             destConfig,
@@ -149,7 +149,7 @@ public class DataFrameTransformProgressIT extends ESIntegTestCase {
 
 
         QueryConfig queryConfig = new QueryConfig(Collections.emptyMap(), QueryBuilders.termQuery("user_id", "user_26"));
-        pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig);
+        pivotConfig = new PivotConfig(histgramGroupConfig, aggregationConfig, null);
         sourceConfig = new SourceConfig(new String[]{REVIEWS_INDEX_NAME}, queryConfig);
         config = new DataFrameTransformConfig("get_progress_transform",
             sourceConfig,

+ 5 - 3
x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java

@@ -76,13 +76,15 @@ public class Pivot {
      * the page size, the type of aggregations and the data. As the page size is the number of buckets we return
      * per page the page size is a multiplier for the costs of aggregating bucket.
      *
-     * Initially this returns a default, in future it might inspect the configuration and base the initial size
-     * on the aggregations used.
+     * The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided,
+     *    the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used.
+     *
+     * In future we might inspect the configuration and base the initial size on the aggregations used.
      *
      * @return the page size
      */
     public int getInitialPageSize() {
-        return DEFAULT_INITIAL_PAGE_SIZE;
+        return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize();
     }
 
     public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {

+ 19 - 10
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java

@@ -23,7 +23,9 @@ import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
 import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
-import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.AggregationConfigTests;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfigTests;
+import org.elasticsearch.xpack.core.dataframe.transforms.pivot.PivotConfig;
 import org.elasticsearch.xpack.core.indexing.IndexerState;
 import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
 import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
@@ -39,7 +41,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import static org.elasticsearch.xpack.core.dataframe.transforms.DestConfigTests.randomDestConfig;
+import static org.elasticsearch.xpack.core.dataframe.transforms.SourceConfigTests.randomSourceConfig;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -169,9 +174,15 @@ public class DataFrameIndexerTests extends ESTestCase {
     }
 
     public void testPageSizeAdapt() throws InterruptedException {
-        DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfig();
+        Integer pageSize = randomBoolean() ? null : randomIntBetween(500, 10_000);
+        DataFrameTransformConfig config = new DataFrameTransformConfig(randomAlphaOfLength(10),
+            randomSourceConfig(),
+            randomDestConfig(),
+            null,
+            new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
+            randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
         AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
-
+        final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
         Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
             throw new SearchPhaseExecutionException("query", "Partial shards failure", new ShardSearchFailure[] {
                     new ShardSearchFailure(new CircuitBreakingException("to much memory", 110, 100, Durability.TRANSIENT)) });
@@ -179,9 +190,7 @@ public class DataFrameIndexerTests extends ESTestCase {
 
         Function<BulkRequest, BulkResponse> bulkFunction = bulkRequest -> new BulkResponse(new BulkItemResponse[0], 100);
 
-        Consumer<Exception> failureConsumer = e -> {
-            fail("expected circuit breaker exception to be handled");
-        };
+        Consumer<Exception> failureConsumer = e -> fail("expected circuit breaker exception to be handled");
 
         final ExecutorService executor = Executors.newFixedThreadPool(1);
         try {
@@ -197,8 +206,8 @@ public class DataFrameIndexerTests extends ESTestCase {
             latch.countDown();
             awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
             long pageSizeAfterFirstReduction = indexer.getPageSize();
-            assertTrue(Pivot.DEFAULT_INITIAL_PAGE_SIZE > pageSizeAfterFirstReduction);
-            assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
+            assertThat(initialPageSize, greaterThan(pageSizeAfterFirstReduction));
+            assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));
 
             // run indexer a 2nd time
             final CountDownLatch secondRunLatch = indexer.newLatch(1);
@@ -211,8 +220,8 @@ public class DataFrameIndexerTests extends ESTestCase {
             awaitBusy(() -> indexer.getState() == IndexerState.STOPPED);
 
             // assert that page size has been reduced again
-            assertTrue(pageSizeAfterFirstReduction > indexer.getPageSize());
-            assertTrue(pageSizeAfterFirstReduction > DataFrameIndexer.MINIMUM_PAGE_SIZE);
+            assertThat(pageSizeAfterFirstReduction, greaterThan((long)indexer.getPageSize()));
+            assertThat(pageSizeAfterFirstReduction, greaterThan((long)DataFrameIndexer.MINIMUM_PAGE_SIZE));
 
         } finally {
             executor.shutdownNow();

+ 12 - 2
x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java

@@ -107,6 +107,16 @@ public class PivotTests extends ESTestCase {
         assertInvalidTransform(client, source, pivot);
     }
 
+    public void testInitialPageSize() throws Exception {
+        int expectedPageSize = 1000;
+
+        Pivot pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), expectedPageSize));
+        assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize));
+
+        pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null));
+        assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE));
+    }
+
     public void testSearchFailure() throws Exception {
         // test a failure during the search operation, transform creation fails if
         // search has failures although they might just be temporary
@@ -177,11 +187,11 @@ public class PivotTests extends ESTestCase {
     }
 
     private PivotConfig getValidPivotConfig() throws IOException {
-        return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig());
+        return new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null);
     }
 
     private PivotConfig getValidPivotConfig(AggregationConfig aggregationConfig) throws IOException {
-        return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig);
+        return new PivotConfig(GroupConfigTests.randomGroupConfig(), aggregationConfig, null);
     }
 
     private AggregationConfig getValidAggregationConfig() throws IOException {

+ 30 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

@@ -303,6 +303,36 @@ setup:
             }
           }
 ---
+"Test put config with invalid pivot size":
+  - do:
+      catch: /pivot\.max_page_search_size \[5\] must be greater than 10 and less than 10,000/
+      data_frame.put_data_frame_transform:
+        transform_id: "airline-transform"
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-dest-index" },
+            "pivot": {
+              "max_page_search_size": 5,
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            }
+          }
+  - do:
+      catch: /pivot\.max_page_search_size \[15000\] must be greater than 10 and less than 10,000/
+      data_frame.put_data_frame_transform:
+        transform_id: "airline-transform"
+        body: >
+          {
+            "source": { "index": "airline-data" },
+            "dest": { "index": "airline-dest-index" },
+            "pivot": {
+              "max_page_search_size": 15000,
+              "group_by": { "airline": {"terms": {"field": "airline"}}},
+              "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
+            }
+          }
+---
 "Test creation failures due to duplicate and conflicting field names":
   - do:
       catch: /duplicate field \[airline\] detected/