Browse Source

Support max_single_primary_size in Resize Action and exposed in ILM (#67705)

bellengao 4 years ago
parent
commit
d69c03359f
28 changed files with 534 additions and 106 deletions
  1. 43 11
      client/rest-high-level/src/main/java/org/elasticsearch/client/ilm/ShrinkAction.java
  2. 16 0
      client/rest-high-level/src/main/java/org/elasticsearch/client/indices/ResizeRequest.java
  3. 1 1
      client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java
  4. 4 0
      client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesRequestConvertersTests.java
  5. 2 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ILMDocumentationIT.java
  6. 11 3
      client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java
  7. 17 2
      client/rest-high-level/src/test/java/org/elasticsearch/client/ilm/ShrinkActionTests.java
  8. 6 2
      docs/java-rest/high-level/indices/shrink_index.asciidoc
  9. 51 13
      docs/reference/ilm/actions/ilm-shrink.asciidoc
  10. 13 1
      docs/reference/indices/shrink-index.asciidoc
  11. 37 0
      server/src/main/java/org/elasticsearch/action/admin/indices/shrink/ResizeRequest.java
  12. 9 0
      server/src/main/java/org/elasticsearch/action/admin/indices/shrink/ResizeRequestBuilder.java
  13. 62 3
      server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java
  14. 96 6
      server/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeActionTests.java
  15. 69 15
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java
  16. 22 10
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkStep.java
  17. 24 5
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkActionTests.java
  18. 31 12
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkStepTests.java
  19. 3 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java
  20. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java
  21. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/ExplainLifecycleIT.java
  22. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java
  23. 7 7
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java
  24. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java
  25. 2 2
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java
  26. 1 1
      x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/ILMMultiNodeIT.java
  27. 1 1
      x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java
  28. 2 2
      x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java

+ 43 - 11
client/rest-high-level/src/main/java/org/elasticsearch/client/ilm/ShrinkAction.java

@@ -18,9 +18,12 @@
  */
 package org.elasticsearch.client.ilm;
 
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
@@ -31,31 +34,53 @@ import java.util.Objects;
 public class ShrinkAction implements LifecycleAction, ToXContentObject {
     public static final String NAME = "shrink";
     private static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards");
+    private static final ParseField MAX_SINGLE_PRIMARY_SIZE = new ParseField("max_single_primary_size");
 
     private static final ConstructingObjectParser<ShrinkAction, Void> PARSER =
-        new ConstructingObjectParser<>(NAME, true, a -> new ShrinkAction((Integer) a[0]));
+        new ConstructingObjectParser<>(NAME, true, a -> new ShrinkAction((Integer) a[0], (ByteSizeValue) a[1]));
 
     static {
-        PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_SHARDS_FIELD);
+        PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), NUMBER_OF_SHARDS_FIELD);
+        PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
+            (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_SINGLE_PRIMARY_SIZE.getPreferredName()),
+            MAX_SINGLE_PRIMARY_SIZE, ObjectParser.ValueType.STRING);
     }
 
-    private int numberOfShards;
+    private Integer numberOfShards;
+    private ByteSizeValue maxSinglePrimarySize;
 
     public static ShrinkAction parse(XContentParser parser) throws IOException {
         return PARSER.parse(parser, null);
     }
 
-    public ShrinkAction(int numberOfShards) {
-        if (numberOfShards <= 0) {
-            throw new IllegalArgumentException("[" + NUMBER_OF_SHARDS_FIELD.getPreferredName() + "] must be greater than 0");
+    public ShrinkAction(@Nullable Integer numberOfShards, ByteSizeValue maxSinglePrimarySize) {
+        if (numberOfShards != null && maxSinglePrimarySize != null) {
+            throw new IllegalArgumentException("Cannot set both [number_of_shards] and [max_single_primary_size]");
+        }
+        if (numberOfShards == null && maxSinglePrimarySize == null) {
+            throw new IllegalArgumentException("Either [number_of_shards] or [max_single_primary_size] must be set");
+        }
+        if (maxSinglePrimarySize != null) {
+            if (maxSinglePrimarySize.getBytes() <= 0) {
+                throw new IllegalArgumentException("[max_single_primary_size] must be greater than 0");
+            }
+            this.maxSinglePrimarySize = maxSinglePrimarySize;
+        } else {
+            if (numberOfShards <= 0) {
+                throw new IllegalArgumentException("[" + NUMBER_OF_SHARDS_FIELD.getPreferredName() + "] must be greater than 0");
+            }
+            this.numberOfShards = numberOfShards;
         }
-        this.numberOfShards = numberOfShards;
     }
 
-    int getNumberOfShards() {
+    Integer getNumberOfShards() {
         return numberOfShards;
     }
 
+    ByteSizeValue getMaxSinglePrimarySize() {
+        return maxSinglePrimarySize;
+    }
+
     @Override
     public String getName() {
         return NAME;
@@ -64,7 +89,12 @@ public class ShrinkAction implements LifecycleAction, ToXContentObject {
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
-        builder.field(NUMBER_OF_SHARDS_FIELD.getPreferredName(), numberOfShards);
+        if (numberOfShards != null) {
+            builder.field(NUMBER_OF_SHARDS_FIELD.getPreferredName(), numberOfShards);
+        }
+        if (maxSinglePrimarySize != null) {
+            builder.field(MAX_SINGLE_PRIMARY_SIZE.getPreferredName(), maxSinglePrimarySize);
+        }
         builder.endObject();
         return builder;
     }
@@ -74,12 +104,14 @@ public class ShrinkAction implements LifecycleAction, ToXContentObject {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ShrinkAction that = (ShrinkAction) o;
-        return Objects.equals(numberOfShards, that.numberOfShards);
+
+        return Objects.equals(numberOfShards, that.numberOfShards) &&
+            Objects.equals(maxSinglePrimarySize, that.maxSinglePrimarySize);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(numberOfShards);
+        return Objects.hash(numberOfShards, maxSinglePrimarySize);
     }
 
     @Override

+ 16 - 0
client/rest-high-level/src/main/java/org/elasticsearch/client/indices/ResizeRequest.java

@@ -24,6 +24,7 @@ import org.elasticsearch.client.TimedRequest;
 import org.elasticsearch.client.Validatable;
 import org.elasticsearch.client.ValidationException;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 
@@ -45,6 +46,7 @@ public class ResizeRequest extends TimedRequest implements Validatable, ToXConte
     private final String targetIndex;
     private Settings settings = Settings.EMPTY;
     private Set<Alias> aliases = new HashSet<>();
+    private ByteSizeValue maxSinglePrimarySize;
 
     /**
      * Creates a new resize request
@@ -87,6 +89,20 @@ public class ResizeRequest extends TimedRequest implements Validatable, ToXConte
         return Collections.unmodifiableSet(this.aliases);
     }
 
+    /**
+     * Sets the max single primary shard size of the target index
+     */
+    public void setMaxSinglePrimarySize(ByteSizeValue maxSinglePrimarySize) {
+        this.maxSinglePrimarySize = maxSinglePrimarySize;
+    }
+
+    /**
+     * Return the max single primary shard size of the target index
+     */
+    public ByteSizeValue getMaxSinglePrimarySize() {
+        return maxSinglePrimarySize;
+    }
+
     @Override
     public Optional<ValidationException> validate() {
         ValidationException validationException = new ValidationException();

+ 1 - 1
client/rest-high-level/src/test/java/org/elasticsearch/client/IndexLifecycleIT.java

@@ -156,7 +156,7 @@ public class IndexLifecycleIT extends ESRestHighLevelClientTestCase {
         Map<String, LifecycleAction> warmActions = new HashMap<>();
         warmActions.put(UnfollowAction.NAME, new UnfollowAction());
         warmActions.put(AllocateAction.NAME, new AllocateAction(null, null, null, Collections.singletonMap("_name", "node-1")));
-        warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
+        warmActions.put(ShrinkAction.NAME, new ShrinkAction(1, null));
         warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1000));
         lifecyclePhases.put("warm", new Phase("warm", TimeValue.timeValueSeconds(1000), warmActions));
 

+ 4 - 0
client/rest-high-level/src/test/java/org/elasticsearch/client/IndicesRequestConvertersTests.java

@@ -62,6 +62,7 @@ import org.elasticsearch.client.indices.rollover.RolloverRequest;
 import org.elasticsearch.common.CheckedFunction;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.common.xcontent.XContentType;
@@ -655,6 +656,9 @@ public class IndicesRequestConvertersTests extends ESTestCase {
         if (resizeType == ResizeType.SPLIT) {
             resizeRequest.setSettings(Settings.builder().put("index.number_of_shards", 2).build());
         }
+        if (resizeType == ResizeType.SHRINK) {
+            resizeRequest.setMaxSinglePrimarySize(new ByteSizeValue(randomIntBetween(1, 100)));
+        }
 
         Request request = function.apply(resizeRequest);
         Assert.assertEquals(HttpPut.METHOD_NAME, request.getMethod());

+ 2 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ILMDocumentationIT.java

@@ -264,7 +264,7 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
             PutLifecyclePolicyRequest putRequest = new PutLifecyclePolicyRequest(myPolicyAsPut);
 
             Map<String, Phase> otherPolicyPhases = new HashMap<>(phases);
-            Map<String, LifecycleAction> warmActions = Collections.singletonMap(ShrinkAction.NAME, new ShrinkAction(1));
+            Map<String, LifecycleAction> warmActions = Collections.singletonMap(ShrinkAction.NAME, new ShrinkAction(1, null));
             otherPolicyPhases.put("warm", new Phase("warm", new TimeValue(30, TimeUnit.DAYS), warmActions));
             otherPolicyAsPut = new LifecyclePolicy("other_policy", otherPolicyPhases);
 
@@ -614,7 +614,7 @@ public class ILMDocumentationIT extends ESRestHighLevelClientTestCase {
         {
             Map<String, Phase> phases = new HashMap<>();
             Map<String, LifecycleAction> warmActions = new HashMap<>();
-            warmActions.put(ShrinkAction.NAME, new ShrinkAction(3));
+            warmActions.put(ShrinkAction.NAME, new ShrinkAction(3, null));
             phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
 
             LifecyclePolicy policy = new LifecyclePolicy("my_policy",

+ 11 - 3
client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/IndicesClientDocumentationIT.java

@@ -1609,11 +1609,19 @@ public class IndicesClientDocumentationIT extends ESRestHighLevelClientTestCase
         request.setWaitForActiveShards(2); // <1>
         request.setWaitForActiveShards(ActiveShardCount.DEFAULT); // <2>
         // end::shrink-index-request-waitForActiveShards
-        // tag::shrink-index-request-settings
-        request.getTargetIndexRequest().settings(Settings.builder()
+        if (randomBoolean()) {
+            // tag::shrink-index-request-settings
+            request.getTargetIndexRequest().settings(Settings.builder()
                 .put("index.number_of_shards", 2) // <1>
                 .putNull("index.routing.allocation.require._name")); // <2>
-        // end::shrink-index-request-settings
+            // end::shrink-index-request-settings
+        } else {
+            request.getTargetIndexRequest().settings(Settings.builder()
+                .putNull("index.routing.allocation.require._name"));
+            // tag::shrink-index-request-maxSinglePrimarySize
+            request.setMaxSinglePrimarySize(new ByteSizeValue(50, ByteSizeUnit.GB)); // <1>
+            // end::shrink-index-request-maxSinglePrimarySize
+        }
         // tag::shrink-index-request-aliases
         request.getTargetIndexRequest().alias(new Alias("target_alias")); // <1>
         // end::shrink-index-request-aliases

+ 17 - 2
client/rest-high-level/src/test/java/org/elasticsearch/client/ilm/ShrinkActionTests.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.client.ilm;
 
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.test.AbstractXContentTestCase;
 
@@ -38,7 +39,11 @@ public class ShrinkActionTests extends AbstractXContentTestCase<ShrinkAction> {
     }
 
     static ShrinkAction randomInstance() {
-        return new ShrinkAction(randomIntBetween(1, 100));
+        if (randomBoolean()) {
+            return new ShrinkAction(randomIntBetween(1, 100), null);
+        } else {
+            return new ShrinkAction(null, new ByteSizeValue(randomIntBetween(1, 100)));
+        }
     }
 
     @Override
@@ -47,7 +52,17 @@ public class ShrinkActionTests extends AbstractXContentTestCase<ShrinkAction> {
     }
 
     public void testNonPositiveShardNumber() {
-        Exception e = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(-100, 0)));
+        Exception e = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(-100, 0), null));
         assertThat(e.getMessage(), equalTo("[number_of_shards] must be greater than 0"));
     }
+
+    public void testMaxSinglePrimarySize() {
+        ByteSizeValue maxSinglePrimarySize1 = new ByteSizeValue(10);
+        Exception e1 = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(1, 100), maxSinglePrimarySize1));
+        assertThat(e1.getMessage(), equalTo("Cannot set both [number_of_shards] and [max_single_primary_size]"));
+
+        ByteSizeValue maxSinglePrimarySize2 = new ByteSizeValue(0);
+        Exception e2 = expectThrows(Exception.class, () -> new org.elasticsearch.client.ilm.ShrinkAction(null, maxSinglePrimarySize2));
+        assertThat(e2.getMessage(), equalTo("[max_single_primary_size] must be greater than 0"));
+    }
 }

+ 6 - 2
docs/java-rest/high-level/indices/shrink_index.asciidoc

@@ -54,6 +54,12 @@ include-tagged::{doc-tests-file}[{api}-request-settings]
 <1> The number of shards on the target of the shrink index request
 <2> Remove the allocation requirement copied from the source index
 
+["source","java",subs="attributes,callouts,macros"]
+--------------------------------------------------
+include-tagged::{doc-tests-file}[{api}-request-maxSinglePrimarySize]
+--------------------------------------------------
+<1> The max single primary shard size of the target index
+
 ["source","java",subs="attributes,callouts,macros"]
 --------------------------------------------------
 include-tagged::{doc-tests-file}[{api}-request-aliases]
@@ -75,5 +81,3 @@ include-tagged::{doc-tests-file}[{api}-response]
 <1> Indicates whether all of the nodes have acknowledged the request
 <2> Indicates whether the requisite number of shard copies were started for
 each shard in the index before timing out
-
-

+ 51 - 13
docs/reference/ilm/actions/ilm-shrink.asciidoc

@@ -4,24 +4,24 @@
 
 Phases allowed: hot, warm.
 
-Sets an index to <<dynamic-index-settings, read-only>> 
-and shrinks it into a new index with fewer primary shards. 
-The name of the new index is of the form `shrink-<original-index-name>`. 
-For example, if the name of the source index is _logs_, 
+Sets an index to <<dynamic-index-settings, read-only>>
+and shrinks it into a new index with fewer primary shards.
+The name of the new index is of the form `shrink-<original-index-name>`.
+For example, if the name of the source index is _logs_,
 the name of the shrunken index is _shrink-logs_.
 
-The shrink action allocates all primary shards of the index to one node so it 
+The shrink action allocates all primary shards of the index to one node so it
 can call the <<indices-shrink-index,Shrink API>> to shrink the index.
-After shrinking, it swaps aliases that point to the original index to the new shrunken index. 
+After shrinking, it swaps aliases that point to the original index to the new shrunken index.
 
 To use the `shrink` action in the `hot` phase, the `rollover` action *must* be present.
 If no rollover action is configured, {ilm-init} will reject the policy.
 
 [IMPORTANT]
-If the shrink action is used on a <<ccr-put-follow,follower index>>, 
+If the shrink action is used on a <<ccr-put-follow,follower index>>,
 policy execution waits until the leader index rolls over (or is
-<<skipping-rollover, otherwise marked complete>>), 
-then converts the follower index into a regular index with the 
+<<skipping-rollover, otherwise marked complete>>),
+then converts the follower index into a regular index with the
 <<ilm-unfollow,unfollow>> action before performing the shrink operation.
 
 If the managed index is part of a <<data-streams, data stream>>,
@@ -40,14 +40,30 @@ managed indices.
 [[ilm-shrink-options]]
 ==== Shrink options
 `number_of_shards`::
-(Required, integer) 
-Number of shards to shrink to. 
-Must be a factor of the number of shards in the source index.
+(Optional, integer)
+Number of shards to shrink to.
+Must be a factor of the number of shards in the source index. This parameter conflicts with
+`max_single_primary_size`, only one of them may be set.
+
+`max_single_primary_size`::
+(Optional, <<byte-units, byte units>>)
+The max single primary shard size for the target index. Used to find the optimum number of shards for the target index.
+When this parameter is set, each shard's storage in the target index will not be greater than the parameter.
+The shards count of the target index will still be a factor of the source index's shards count, but if the parameter
+is less than the single shard size in the source index, the shards count for the target index will be equal to the source index's shards count.
+For example, when this parameter is set to 50gb, if the source index has 60 primary shards with totaling 100gb, then the
+target index will have 2 primary shards, with each shard size of 50gb; if the source index has 60 primary shards
+with totaling 1000gb, then the target index will have 20 primary shards; if the source index has 60 primary shards
+with totaling 4000gb, then the target index will still have 60 primary shards. This parameter conflicts
+with `number_of_shards` in the `settings`, only one of them may be set.
 
 
 [[ilm-shrink-ex]]
 ==== Example
- 
+
+[[ilm-shrink-shards-ex]]
+===== Set the number of shards of the new shrunken index explicitly
+
 [source,console]
 --------------------------------------------------
 PUT _ilm/policy/my_policy
@@ -65,3 +81,25 @@ PUT _ilm/policy/my_policy
   }
 }
 --------------------------------------------------
+
+[[ilm-shrink-size-ex]]
+===== Calculate the number of shards of the new shrunken index based on the storage of the
+source index and the `max_single_primary_size` parameter
+
+[source,console]
+--------------------------------------------------
+PUT _ilm/policy/my_policy
+{
+  "policy": {
+    "phases": {
+      "warm": {
+        "actions": {
+          "shrink" : {
+            "max_single_primary_size": "50gb"
+          }
+        }
+      }
+    }
+  }
+}
+--------------------------------------------------

+ 13 - 1
docs/reference/indices/shrink-index.asciidoc

@@ -56,7 +56,7 @@ PUT /my_source_index/_settings
 // TEST[s/^/PUT my_source_index\n{"settings":{"index.number_of_shards":2}}\n/]
 
 <1> Removes replica shards for the index.
-<2> Relocates the index's shards to the `shrink_node_name` node. 
+<2> Relocates the index's shards to the `shrink_node_name` node.
     See <<shard-allocation-filtering>>.
 <3> Prevents write operations to this index. Metadata changes, such as deleting
     the index, are still allowed.
@@ -230,3 +230,15 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=target-index-aliases]
 
 include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=target-index-settings]
+
+`max_single_primary_size`::
+(Optional, <<byte-units, byte units>>)
+The max single primary shard size for the target index. Used to find the optimum number of shards for the target index.
+When this parameter is set, each shard's storage in the target index will not be greater than the parameter.
+The shards count of the target index will still be a factor of the source index's shards count, but if the parameter
+is less than the single shard size in the source index, the shards count for the target index will be equal to the source index's shards count.
+For example, when this parameter is set to 50gb, if the source index has 60 primary shards with totaling 100gb, then the
+target index will have 2 primary shards, with each shard size of 50gb; if the source index has 60 primary shards
+with totaling 1000gb, then the target index will have 20 primary shards; if the source index has 60 primary shards
+with totaling 4000gb, then the target index will still have 60 primary shards. This parameter conflicts
+with `number_of_shards` in the `settings`, only one of them may be set.

+ 37 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/shrink/ResizeRequest.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.action.admin.indices.shrink;
 
+import org.elasticsearch.Version;
 import org.elasticsearch.action.ActionRequestValidationException;
 import org.elasticsearch.action.IndicesRequest;
 import org.elasticsearch.action.admin.indices.alias.Alias;
@@ -29,6 +30,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.ToXContentObject;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -45,17 +47,22 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
 public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements IndicesRequest, ToXContentObject {
 
     public static final ObjectParser<ResizeRequest, Void> PARSER = new ObjectParser<>("resize_request");
+    private static final ParseField MAX_SINGLE_PRIMARY_SIZE = new ParseField("max_single_primary_size");
     static {
         PARSER.declareField((parser, request, context) -> request.getTargetIndexRequest().settings(parser.map()),
             new ParseField("settings"), ObjectParser.ValueType.OBJECT);
         PARSER.declareField((parser, request, context) -> request.getTargetIndexRequest().aliases(parser.map()),
             new ParseField("aliases"), ObjectParser.ValueType.OBJECT);
+        PARSER.declareField(ResizeRequest::setMaxSinglePrimarySize,
+            (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_SINGLE_PRIMARY_SIZE.getPreferredName()),
+            MAX_SINGLE_PRIMARY_SIZE, ObjectParser.ValueType.STRING);
     }
 
     private CreateIndexRequest targetIndexRequest;
     private String sourceIndex;
     private ResizeType type = ResizeType.SHRINK;
     private Boolean copySettings = true;
+    private ByteSizeValue maxSinglePrimarySize;
 
     public ResizeRequest(StreamInput in) throws IOException {
         super(in);
@@ -63,6 +70,11 @@ public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements
         sourceIndex = in.readString();
         type = in.readEnum(ResizeType.class);
         copySettings = in.readOptionalBoolean();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            if (in.readBoolean()) {
+                maxSinglePrimarySize = new ByteSizeValue(in);
+            }
+        }
     }
 
     ResizeRequest() {}
@@ -87,6 +99,9 @@ public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements
         if (type == ResizeType.SPLIT && IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexRequest.settings()) == false) {
             validationException = addValidationError("index.number_of_shards is required for split operations", validationException);
         }
+        if (maxSinglePrimarySize != null && maxSinglePrimarySize.getBytes() <= 0) {
+            validationException = addValidationError("max_single_primary_size must be greater than 0", validationException);
+        }
         assert copySettings == null || copySettings;
         return validationException;
     }
@@ -102,6 +117,9 @@ public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements
         out.writeString(sourceIndex);
         out.writeEnum(type);
         out.writeOptionalBoolean(copySettings);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            out.writeOptionalWriteable(maxSinglePrimarySize);
+        }
     }
 
     @Override
@@ -184,6 +202,25 @@ public class ResizeRequest extends AcknowledgedRequest<ResizeRequest> implements
         return copySettings;
     }
 
+    /**
+     * Sets the max single primary shard size of the target index.
+     * It's used to calculate an optimum shards number of the target index according to storage of
+     * the source index, each shard's storage of the target index will not be greater than this parameter,
+     * while the shards number of the target index still be a factor of the source index's shards number.
+     *
+     * @param maxSinglePrimarySize the max single primary shard size of the target index
+     */
+    public void setMaxSinglePrimarySize(ByteSizeValue maxSinglePrimarySize) {
+        this.maxSinglePrimarySize = maxSinglePrimarySize;
+    }
+
+    /**
+     * Returns the max single primary shard size of the target index
+     */
+    public ByteSizeValue getMaxSinglePrimarySize() {
+        return maxSinglePrimarySize;
+    }
+
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();

+ 9 - 0
server/src/main/java/org/elasticsearch/action/admin/indices/shrink/ResizeRequestBuilder.java

@@ -24,6 +24,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
 import org.elasticsearch.client.ElasticsearchClient;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 
 public class ResizeRequestBuilder extends AcknowledgedRequestBuilder<ResizeRequest, ResizeResponse,
     ResizeRequestBuilder> {
@@ -79,4 +80,12 @@ public class ResizeRequestBuilder extends AcknowledgedRequestBuilder<ResizeReque
         this.request.setResizeType(type);
         return this;
     }
+
+    /**
+     * Sets the max single primary shard size of the target index.
+     */
+    public ResizeRequestBuilder setMaxSinglePrimarySize(ByteSizeValue maxSinglePrimarySize) {
+        this.request.setMaxSinglePrimarySize(maxSinglePrimarySize);
+        return this;
+    }
 }

+ 62 - 3
server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.action.admin.indices.shrink;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.IndexWriter;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
@@ -39,10 +41,12 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.shard.DocsStats;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.store.StoreStats;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
@@ -56,6 +60,8 @@ import java.util.function.IntFunction;
  * Main class to initiate resizing (shrink / split) an index into a new index
  */
 public class TransportResizeAction extends TransportMasterNodeAction<ResizeRequest, ResizeResponse> {
+    private static final Logger logger = LogManager.getLogger(TransportResizeAction.class);
+
     private final MetadataCreateIndexService createIndexService;
     private final Client client;
 
@@ -95,7 +101,8 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
             return;
         }
 
-        IndicesStatsRequestBuilder statsRequestBuilder = client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true);
+        IndicesStatsRequestBuilder statsRequestBuilder = client.admin().indices().prepareStats(sourceIndex).clear()
+            .setDocs(true).setStore(true);
         IndicesStatsRequest statsRequest = statsRequestBuilder.request();
         statsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
         // TODO: only fetch indices stats for shrink type resize requests
@@ -103,7 +110,8 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
             ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
                 final CreateIndexClusterStateUpdateRequest updateRequest;
                 try {
-                    updateRequest = prepareCreateIndexRequest(resizeRequest, sourceMetadata, i -> {
+                    StoreStats indexStoreStats = indicesStatsResponse.getPrimaries().store;
+                    updateRequest = prepareCreateIndexRequest(resizeRequest, sourceMetadata, indexStoreStats, i -> {
                         IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
                         return shard == null ? null : shard.getPrimary().getDocs();
                     }, targetIndex);
@@ -121,6 +129,7 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
     // static for unittesting this method
     static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final ResizeRequest resizeRequest,
                                                                           final IndexMetadata sourceMetadata,
+                                                                          final StoreStats indexStoreStats,
                                                                           final IntFunction<DocsStats> perShardDocStats,
                                                                           final String targetIndexName) {
         final CreateIndexRequest targetIndex = resizeRequest.getTargetIndexRequest();
@@ -129,12 +138,37 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
         targetIndexSettingsBuilder.remove(IndexMetadata.SETTING_HISTORY_UUID);
         final Settings targetIndexSettings = targetIndexSettingsBuilder.build();
         final int numShards;
+        ByteSizeValue maxSinglePrimarySize = resizeRequest.getMaxSinglePrimarySize();
         if (IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) {
+            if (resizeRequest.getResizeType() == ResizeType.SHRINK && maxSinglePrimarySize != null) {
+                throw new IllegalArgumentException("Cannot set both index.number_of_shards and max_single_primary_size" +
+                    " for the target index");
+            }
             numShards = IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.get(targetIndexSettings);
         } else {
             assert resizeRequest.getResizeType() != ResizeType.SPLIT : "split must specify the number of shards explicitly";
             if (resizeRequest.getResizeType() == ResizeType.SHRINK) {
-                numShards = 1;
+                if (maxSinglePrimarySize != null) {
+                    int sourceIndexShardsNum = sourceMetadata.getNumberOfShards();
+                    long sourceIndexStorageBytes = indexStoreStats.getSizeInBytes();
+                    long maxSinglePrimarySizeBytes = maxSinglePrimarySize.getBytes();
+                    long minShardsNum = sourceIndexStorageBytes / maxSinglePrimarySizeBytes;
+                    if (minShardsNum * maxSinglePrimarySizeBytes < sourceIndexStorageBytes) {
+                        minShardsNum = minShardsNum + 1;
+                    }
+                    if (minShardsNum > sourceIndexShardsNum) {
+                        logger.info("By setting max_single_primary_size to [{}], the target index [{}] will contain [{}] shards," +
+                                " which will be greater than [{}] shards in the source index [{}]," +
+                                " using [{}] for the shard count of the target index [{}]",
+                            maxSinglePrimarySize.toString(), targetIndexName, minShardsNum, sourceIndexShardsNum,
+                            sourceMetadata.getIndex().getName(), sourceIndexShardsNum, targetIndexName);
+                        numShards = sourceIndexShardsNum;
+                    } else {
+                        numShards = calTargetShardsNum(sourceIndexShardsNum, (int)minShardsNum);
+                    }
+                } else {
+                    numShards = 1;
+                }
             } else {
                 assert resizeRequest.getResizeType() == ResizeType.CLONE;
                 numShards = sourceMetadata.getNumberOfShards();
@@ -199,4 +233,29 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
                 .resizeType(resizeRequest.getResizeType())
                 .copySettings(resizeRequest.getCopySettings() == null ? false : resizeRequest.getCopySettings());
     }
+
+    // Get the minimum factor of sourceIndexShardsNum which is greater minShardsNum
+    protected static int calTargetShardsNum(final int sourceIndexShardsNum, final int minShardsNum) {
+        if (sourceIndexShardsNum <=0 || minShardsNum <=0){
+            return 1;
+        }
+        if (sourceIndexShardsNum % minShardsNum == 0) {
+            return minShardsNum;
+        }
+        int num = (int) Math.floor(Math.sqrt(sourceIndexShardsNum));
+        if (minShardsNum >= num) {
+            for (int i = num; i >= 1; i--) {
+                if (sourceIndexShardsNum % i == 0 && minShardsNum <= sourceIndexShardsNum / i) {
+                    return sourceIndexShardsNum / i;
+                }
+            }
+        } else {
+            for (int i = 1; i < num; i++) {
+                if (sourceIndexShardsNum % i == 0 && minShardsNum <= i) {
+                    return i;
+                }
+            }
+        }
+        return sourceIndexShardsNum;
+    }
 }

+ 96 - 6
server/src/test/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeActionTests.java

@@ -39,7 +39,9 @@ import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllo
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.index.shard.DocsStats;
+import org.elasticsearch.index.store.StoreStats;
 import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.gateway.TestGatewayAllocator;
@@ -77,7 +79,8 @@ public class TransportResizeActionTests extends ESTestCase {
         assertTrue(
             expectThrows(IllegalStateException.class, () ->
                 TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), state,
-                (i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), "target")
+                    new StoreStats(between(1, 100), between(1, 100)), (i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000),
+                        between(1, 100)), "target")
         ).getMessage().startsWith("Can't merge index with more than [2147483519] docs - too many documents in shards "));
 
 
@@ -88,6 +91,7 @@ public class TransportResizeActionTests extends ESTestCase {
                     TransportResizeAction.prepareCreateIndexRequest(req,
                             createClusterState("source", 8, 1,
                                     Settings.builder().put("index.blocks.write", true).build()).metadata().index("source"),
+                        new StoreStats(between(1, 100), between(1, 100)),
                         (i) -> i == 2 || i == 3 ? new DocsStats(Integer.MAX_VALUE / 2, between(1, 1000), between(1, 10000)) : null
                         , "target");
                 }
@@ -101,6 +105,7 @@ public class TransportResizeActionTests extends ESTestCase {
                     createClusterState("source", 8, 1,
                             Settings.builder().put("index.blocks.write", true).put("index.soft_deletes.enabled", true).build())
                             .metadata().index("source"),
+                new StoreStats(between(1, 100), between(1, 100)),
                 (i) -> new DocsStats(between(10, 1000), between(1, 10), between(1, 10000)), "target");
         });
         assertThat(softDeletesError.getMessage(), equalTo("Can't disable [index.soft_deletes.enabled] setting on resize"));
@@ -121,6 +126,7 @@ public class TransportResizeActionTests extends ESTestCase {
         clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
 
         TransportResizeAction.prepareCreateIndexRequest(new ResizeRequest("target", "source"), clusterState.metadata().index("source"),
+            new StoreStats(between(1, 100), between(1, 100)),
             (i) -> new DocsStats(between(1, 1000), between(1, 1000), between(0, 10000)), "target");
     }
 
@@ -144,14 +150,16 @@ public class TransportResizeActionTests extends ESTestCase {
         resizeRequest.getTargetIndexRequest()
             .settings(Settings.builder().put("index.number_of_shards", 2).build());
         IndexMetadata indexMetadata = clusterState.metadata().index("source");
-        TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, null, "target");
+        TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, new StoreStats(between(1, 100), between(1, 100)),
+            null, "target");
 
         resizeRequest.getTargetIndexRequest()
             .settings(Settings.builder()
                 .put("index.number_of_routing_shards", randomIntBetween(2, 10))
                 .put("index.number_of_shards", 2)
                 .build());
-        TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, null, "target");
+        TransportResizeAction.prepareCreateIndexRequest(resizeRequest, indexMetadata, new StoreStats(between(1, 100), between(1, 100)),
+            null, "target");
     }
 
     public void testPassNumRoutingShardsAndFail() {
@@ -174,7 +182,8 @@ public class TransportResizeActionTests extends ESTestCase {
         resizeRequest.setResizeType(ResizeType.SPLIT);
         resizeRequest.getTargetIndexRequest()
             .settings(Settings.builder().put("index.number_of_shards", numShards * 2).build());
-        TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState.metadata().index("source"), null, "target");
+        TransportResizeAction.prepareCreateIndexRequest(resizeRequest, clusterState.metadata().index("source"),
+            new StoreStats(between(1, 100), between(1, 100)), null, "target");
 
         resizeRequest.getTargetIndexRequest()
             .settings(Settings.builder()
@@ -182,7 +191,8 @@ public class TransportResizeActionTests extends ESTestCase {
                 .put("index.number_of_routing_shards", numShards * 2).build());
         ClusterState finalState = clusterState;
         IllegalArgumentException iae = expectThrows(IllegalArgumentException.class,
-            () -> TransportResizeAction.prepareCreateIndexRequest(resizeRequest, finalState.metadata().index("source"), null, "target"));
+            () -> TransportResizeAction.prepareCreateIndexRequest(resizeRequest, finalState.metadata().index("source"),
+                new StoreStats(between(1, 100), between(1, 100)), null, "target"));
         assertEquals("cannot provide index.number_of_routing_shards on resize", iae.getMessage());
     }
 
@@ -210,7 +220,7 @@ public class TransportResizeActionTests extends ESTestCase {
         final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
         target.setWaitForActiveShards(activeShardCount);
         CreateIndexClusterStateUpdateRequest request = TransportResizeAction.prepareCreateIndexRequest(
-            target, clusterState.metadata().index(indexName), (i) -> stats, "target");
+            target, clusterState.metadata().index(indexName), new StoreStats(between(1, 100), between(1, 100)), (i) -> stats, "target");
         assertNotNull(request.recoverFrom());
         assertEquals(indexName, request.recoverFrom().getName());
         assertEquals("1", request.settings().get("index.number_of_shards"));
@@ -218,6 +228,86 @@ public class TransportResizeActionTests extends ESTestCase {
         assertEquals(request.waitForActiveShards(), activeShardCount);
     }
 
+    public void testCalculateTargetShardsNum() {
+        assertEquals(TransportResizeAction.calTargetShardsNum(0, 0), 1);
+        assertEquals(TransportResizeAction.calTargetShardsNum(10, 0), 1);
+        assertEquals(TransportResizeAction.calTargetShardsNum(10, 1), 1);
+        assertEquals(TransportResizeAction.calTargetShardsNum(10, 2), 2);
+        assertEquals(TransportResizeAction.calTargetShardsNum(10, 3), 5);
+        assertEquals(TransportResizeAction.calTargetShardsNum(10, 6), 10);
+        assertEquals(TransportResizeAction.calTargetShardsNum(10, 11), 10);
+        assertEquals(TransportResizeAction.calTargetShardsNum(59, 21), 59);
+        assertEquals(TransportResizeAction.calTargetShardsNum(60, 21), 30);
+        assertEquals(TransportResizeAction.calTargetShardsNum(60, 31), 60);
+    }
+
+    public void testShrinkWithMaxSinglePrimarySize() {
+        int sourceIndexShardsNum = randomIntBetween(2, 42);
+        IndexMetadata state = createClusterState("source", sourceIndexShardsNum, randomIntBetween(0, 10),
+            Settings.builder().put("index.blocks.write", true).build()).metadata().index("source");
+        ResizeRequest resizeRequest = new ResizeRequest("target", "source");
+        resizeRequest.setMaxSinglePrimarySize(new ByteSizeValue(10));
+        resizeRequest.getTargetIndexRequest()
+            .settings(Settings.builder().put("index.number_of_shards", 2).build());
+        assertTrue(
+            expectThrows(IllegalArgumentException.class, () ->
+                TransportResizeAction.prepareCreateIndexRequest(resizeRequest, state, new StoreStats(between(1, 100), between(1, 100)),
+                    (i) -> new DocsStats(Integer.MAX_VALUE, between(1, 1000), between(1, 100)), "target")
+            ).getMessage().startsWith("Cannot set both index.number_of_shards and max_single_primary_size for the target index"));
+
+        // create one that won't fail
+        ClusterState clusterState = ClusterState.builder(createClusterState("source", 10, 0,
+            Settings.builder()
+                .put("index.blocks.write", true)
+                .build())).nodes(DiscoveryNodes.builder().add(newNode("node1")))
+            .build();
+        AllocationService service = new AllocationService(new AllocationDeciders(
+            Collections.singleton(new MaxRetryAllocationDecider())),
+            new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE,
+            EmptySnapshotsInfoService.INSTANCE);
+
+        RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        // now we start the shard
+        routingTable = ESAllocationTestCase.startInitializingShardsAndReroute(service, clusterState, "source").routingTable();
+        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        int numSourceShards = clusterState.metadata().index("source").getNumberOfShards();
+        DocsStats stats = new DocsStats(between(0, (IndexWriter.MAX_DOCS) / numSourceShards), between(1, 1000), between(1, 10000));
+
+        // each shard's storage will not be greater than the `max_single_primary_size`
+        ResizeRequest target1 = new ResizeRequest("target", "source");
+        target1.setMaxSinglePrimarySize(new ByteSizeValue(2));
+        StoreStats storeStats = new StoreStats(10, between(1, 100));
+        final int targetIndexShardsNum1 = 5;
+        final ActiveShardCount activeShardCount1 = ActiveShardCount.from(targetIndexShardsNum1);
+        target1.setWaitForActiveShards(targetIndexShardsNum1);
+
+        CreateIndexClusterStateUpdateRequest request1 = TransportResizeAction.prepareCreateIndexRequest(
+            target1, clusterState.metadata().index("source"), storeStats, (i) -> stats, "target");
+        assertNotNull(request1.recoverFrom());
+        assertEquals("source", request1.recoverFrom().getName());
+        assertEquals(String.valueOf(targetIndexShardsNum1), request1.settings().get("index.number_of_shards"));
+        assertEquals("shrink_index", request1.cause());
+        assertEquals(request1.waitForActiveShards(), activeShardCount1);
+
+        // if `max_single_primary_size` is less than the single shard size of the source index,
+        // the shards number of the target index will be equal to the source index's shards number
+        ResizeRequest target2 = new ResizeRequest("target2", "source");
+        target2.setMaxSinglePrimarySize(new ByteSizeValue(1));
+        StoreStats storeStats2 = new StoreStats(100, between(1, 100));
+        final int targetIndexShardsNum2 = 10;
+        final ActiveShardCount activeShardCount2 = ActiveShardCount.from(targetIndexShardsNum2);
+        target2.setWaitForActiveShards(activeShardCount2);
+
+        CreateIndexClusterStateUpdateRequest request2 = TransportResizeAction.prepareCreateIndexRequest(
+            target2, clusterState.metadata().index("source"), storeStats2, (i) -> stats, "target");
+        assertNotNull(request2.recoverFrom());
+        assertEquals("source", request2.recoverFrom().getName());
+        assertEquals(String.valueOf(targetIndexShardsNum2), request2.settings().get("index.number_of_shards"));
+        assertEquals("shrink_index", request2.cause());
+        assertEquals(request2.waitForActiveShards(), activeShardCount2);
+    }
+
     private DiscoveryNode newNode(String nodeId) {
         return new DiscoveryNode(
                 nodeId,

+ 69 - 15
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkAction.java

@@ -7,15 +7,19 @@ package org.elasticsearch.xpack.core.ilm;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.Version;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseField;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
@@ -34,40 +38,83 @@ public class ShrinkAction implements LifecycleAction {
     public static final String NAME = "shrink";
     public static final String SHRUNKEN_INDEX_PREFIX = "shrink-";
     public static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards");
+    private static final ParseField MAX_SINGLE_PRIMARY_SIZE = new ParseField("max_single_primary_size");
     public static final String CONDITIONAL_SKIP_SHRINK_STEP = BranchingStep.NAME + "-check-prerequisites";
     public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";
 
     private static final ConstructingObjectParser<ShrinkAction, Void> PARSER =
-        new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0]));
+        new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0], (ByteSizeValue) a[1]));
 
     static {
-        PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_SHARDS_FIELD);
+        PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), NUMBER_OF_SHARDS_FIELD);
+        PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
+            (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_SINGLE_PRIMARY_SIZE.getPreferredName()),
+            MAX_SINGLE_PRIMARY_SIZE, ObjectParser.ValueType.STRING);
     }
 
-    private int numberOfShards;
+    private Integer numberOfShards;
+    private ByteSizeValue maxSinglePrimarySize;
 
     public static ShrinkAction parse(XContentParser parser) throws IOException {
         return PARSER.parse(parser, null);
     }
 
-    public ShrinkAction(int numberOfShards) {
-        if (numberOfShards <= 0) {
-            throw new IllegalArgumentException("[" + NUMBER_OF_SHARDS_FIELD.getPreferredName() + "] must be greater than 0");
+    public ShrinkAction(@Nullable Integer numberOfShards, @Nullable ByteSizeValue maxSinglePrimarySize) {
+        if (numberOfShards != null && maxSinglePrimarySize != null) {
+            throw new IllegalArgumentException("Cannot set both [number_of_shards] and [max_single_primary_size]");
+        }
+        if (numberOfShards == null && maxSinglePrimarySize == null) {
+            throw new IllegalArgumentException("Either [number_of_shards] or [max_single_primary_size] must be set");
+        }
+        if (maxSinglePrimarySize != null) {
+            if (maxSinglePrimarySize.getBytes() <= 0) {
+                throw new IllegalArgumentException("[max_single_primary_size] must be greater than 0");
+            }
+            this.maxSinglePrimarySize = maxSinglePrimarySize;
+        } else {
+            if (numberOfShards <= 0) {
+                throw new IllegalArgumentException("[" + NUMBER_OF_SHARDS_FIELD.getPreferredName() + "] must be greater than 0");
+            }
+            this.numberOfShards = numberOfShards;
         }
-        this.numberOfShards = numberOfShards;
     }
 
     public ShrinkAction(StreamInput in) throws IOException {
-        this.numberOfShards = in.readVInt();
+        if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
+            if (in.readBoolean()) {
+                this.numberOfShards = in.readVInt();
+                this.maxSinglePrimarySize = null;
+            } else {
+                this.numberOfShards = null;
+                this.maxSinglePrimarySize = new ByteSizeValue(in);
+            }
+        } else {
+            this.numberOfShards = in.readVInt();
+            this.maxSinglePrimarySize = null;
+        }
     }
 
-    int getNumberOfShards() {
+    Integer getNumberOfShards() {
         return numberOfShards;
     }
 
+    ByteSizeValue getMaxSinglePrimarySize() {
+        return maxSinglePrimarySize;
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
-        out.writeVInt(numberOfShards);
+        if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
+            boolean hasNumberOfShards = numberOfShards != null;
+            out.writeBoolean(hasNumberOfShards);
+            if (hasNumberOfShards) {
+                out.writeVInt(numberOfShards);
+            } else {
+                maxSinglePrimarySize.writeTo(out);
+            }
+        } else {
+            out.writeVInt(numberOfShards);
+        }
     }
 
     @Override
@@ -78,7 +125,12 @@ public class ShrinkAction implements LifecycleAction {
     @Override
     public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
         builder.startObject();
-        builder.field(NUMBER_OF_SHARDS_FIELD.getPreferredName(), numberOfShards);
+        if (numberOfShards != null) {
+            builder.field(NUMBER_OF_SHARDS_FIELD.getPreferredName(), numberOfShards);
+        }
+        if (maxSinglePrimarySize != null) {
+            builder.field(MAX_SINGLE_PRIMARY_SIZE.getPreferredName(), maxSinglePrimarySize);
+        }
         builder.endObject();
         return builder;
     }
@@ -110,7 +162,7 @@ public class ShrinkAction implements LifecycleAction {
         BranchingStep conditionalSkipShrinkStep = new BranchingStep(preShrinkBranchingKey, checkNotWriteIndex, nextStepKey,
             (index, clusterState) -> {
                 IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
-                if (indexMetadata.getNumberOfShards() == numberOfShards) {
+                if (numberOfShards != null && indexMetadata.getNumberOfShards() == numberOfShards) {
                     return true;
                 }
                 if (indexMetadata.getSettings().get(LifecycleSettings.SNAPSHOT_INDEX_NAME) != null) {
@@ -127,7 +179,8 @@ public class ShrinkAction implements LifecycleAction {
         UpdateSettingsStep readOnlyStep = new UpdateSettingsStep(readOnlyKey, setSingleNodeKey, client, readOnlySettings);
         SetSingleNodeAllocateStep setSingleNodeStep = new SetSingleNodeAllocateStep(setSingleNodeKey, allocationRoutedKey, client);
         CheckShrinkReadyStep checkShrinkReadyStep = new CheckShrinkReadyStep(allocationRoutedKey, shrinkKey);
-        ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, SHRUNKEN_INDEX_PREFIX);
+        ShrinkStep shrink = new ShrinkStep(shrinkKey, enoughShardsKey, client, numberOfShards, maxSinglePrimarySize,
+            SHRUNKEN_INDEX_PREFIX);
         ShrunkShardsAllocatedStep allocated = new ShrunkShardsAllocatedStep(enoughShardsKey, copyMetadataKey, SHRUNKEN_INDEX_PREFIX);
         CopyExecutionStateStep copyMetadata = new CopyExecutionStateStep(copyMetadataKey, dataStreamCheckBranchingKey,
             SHRUNKEN_INDEX_PREFIX, ShrunkenIndexCheckStep.NAME);
@@ -157,12 +210,13 @@ public class ShrinkAction implements LifecycleAction {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         ShrinkAction that = (ShrinkAction) o;
-        return Objects.equals(numberOfShards, that.numberOfShards);
+        return Objects.equals(numberOfShards, that.numberOfShards) &&
+            Objects.equals(maxSinglePrimarySize, that.maxSinglePrimarySize);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(numberOfShards);
+        return Objects.hash(numberOfShards, maxSinglePrimarySize);
     }
 
     @Override

+ 22 - 10
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/ShrinkStep.java

@@ -12,6 +12,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateObserver;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 
 import java.util.Objects;
 
@@ -21,19 +22,26 @@ import java.util.Objects;
 public class ShrinkStep extends AsyncActionStep {
     public static final String NAME = "shrink";
 
-    private int numberOfShards;
+    private Integer numberOfShards;
+    private ByteSizeValue maxSinglePrimarySize;
     private String shrunkIndexPrefix;
 
-    public ShrinkStep(StepKey key, StepKey nextStepKey, Client client, int numberOfShards, String shrunkIndexPrefix) {
+    public ShrinkStep(StepKey key, StepKey nextStepKey, Client client, Integer numberOfShards,
+                      ByteSizeValue maxSinglePrimarySize, String shrunkIndexPrefix) {
         super(key, nextStepKey, client);
         this.numberOfShards = numberOfShards;
+        this.maxSinglePrimarySize = maxSinglePrimarySize;
         this.shrunkIndexPrefix = shrunkIndexPrefix;
     }
 
-    public int getNumberOfShards() {
+    public Integer getNumberOfShards() {
         return numberOfShards;
     }
 
+    public ByteSizeValue getMaxSinglePrimarySize() {
+        return maxSinglePrimarySize;
+    }
+
     String getShrunkIndexPrefix() {
         return shrunkIndexPrefix;
     }
@@ -48,17 +56,20 @@ public class ShrinkStep extends AsyncActionStep {
 
         String lifecycle = LifecycleSettings.LIFECYCLE_NAME_SETTING.get(indexMetadata.getSettings());
 
-        Settings relevantTargetSettings = Settings.builder()
-            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
-            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, indexMetadata.getNumberOfReplicas())
+        Settings.Builder builder = Settings.builder();
+        // need to remove the single shard, allocation so replicas can be allocated
+        builder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, indexMetadata.getNumberOfReplicas())
             .put(LifecycleSettings.LIFECYCLE_NAME, lifecycle)
-            .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null) // need to remove the single shard
-                                                                                             // allocation so replicas can be allocated
-            .build();
+            .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null);
+        if (numberOfShards != null) {
+            builder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards);
+        }
+        Settings relevantTargetSettings = builder.build();
 
         String shrunkenIndexName = shrunkIndexPrefix + indexMetadata.getIndex().getName();
         ResizeRequest resizeRequest = new ResizeRequest(shrunkenIndexName, indexMetadata.getIndex().getName())
             .masterNodeTimeout(getMasterTimeout(currentState));
+        resizeRequest.setMaxSinglePrimarySize(maxSinglePrimarySize);
         resizeRequest.getTargetIndexRequest().settings(relevantTargetSettings);
 
         getClient().admin().indices().resizeIndex(resizeRequest, ActionListener.wrap(response -> {
@@ -72,7 +83,7 @@ public class ShrinkStep extends AsyncActionStep {
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), numberOfShards, shrunkIndexPrefix);
+        return Objects.hash(super.hashCode(), numberOfShards, maxSinglePrimarySize, shrunkIndexPrefix);
     }
 
     @Override
@@ -86,6 +97,7 @@ public class ShrinkStep extends AsyncActionStep {
         ShrinkStep other = (ShrinkStep) obj;
         return super.equals(obj) &&
                 Objects.equals(numberOfShards, other.numberOfShards) &&
+                Objects.equals(maxSinglePrimarySize, other.maxSinglePrimarySize) &&
                 Objects.equals(shrunkIndexPrefix, other.shrunkIndexPrefix);
     }
 

+ 24 - 5
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkActionTests.java

@@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
@@ -34,12 +35,20 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
     }
 
     static ShrinkAction randomInstance() {
-        return new ShrinkAction(randomIntBetween(1, 100));
+        if (randomBoolean()) {
+            return new ShrinkAction(randomIntBetween(1, 100), null);
+        } else {
+            return new ShrinkAction(null, new ByteSizeValue(randomIntBetween(1, 100)));
+        }
     }
 
     @Override
     protected ShrinkAction mutateInstance(ShrinkAction action) {
-        return new ShrinkAction(action.getNumberOfShards() + randomIntBetween(1, 2));
+        if (action.getNumberOfShards() != null) {
+            return new ShrinkAction(action.getNumberOfShards() + randomIntBetween(1, 2), null);
+        } else {
+            return new ShrinkAction(null, new ByteSizeValue(action.getMaxSinglePrimarySize().getBytes() + 1));
+        }
     }
 
     @Override
@@ -48,14 +57,24 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
     }
 
     public void testNonPositiveShardNumber() {
-        Exception e = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(-100, 0)));
+        Exception e = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(-100, 0), null));
         assertThat(e.getMessage(), equalTo("[number_of_shards] must be greater than 0"));
     }
 
+    public void testMaxSinglePrimarySize() {
+        ByteSizeValue maxSinglePrimarySize1 = new ByteSizeValue(10);
+        Exception e1 = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(1, 100), maxSinglePrimarySize1));
+        assertThat(e1.getMessage(), equalTo("Cannot set both [number_of_shards] and [max_single_primary_size]"));
+
+        ByteSizeValue maxSinglePrimarySize2 = new ByteSizeValue(0);
+        Exception e2 = expectThrows(Exception.class, () -> new ShrinkAction(null, maxSinglePrimarySize2));
+        assertThat(e2.getMessage(), equalTo("[max_single_primary_size] must be greater than 0"));
+    }
+
     public void testPerformActionWithSkip() {
         String lifecycleName = randomAlphaOfLengthBetween(4, 10);
         int numberOfShards = randomIntBetween(1, 10);
-        ShrinkAction action = new ShrinkAction(numberOfShards);
+        ShrinkAction action = new ShrinkAction(numberOfShards, null);
         String phase = randomAlphaOfLengthBetween(1, 10);
         StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
             randomAlphaOfLengthBetween(1, 10));
@@ -90,7 +109,7 @@ public class ShrinkActionTests extends AbstractActionTestCase<ShrinkAction> {
         int divisor = randomFrom(2, 3, 6);
         int expectedFinalShards = numShards / divisor;
         String lifecycleName = randomAlphaOfLengthBetween(4, 10);
-        ShrinkAction action = new ShrinkAction(expectedFinalShards);
+        ShrinkAction action = new ShrinkAction(expectedFinalShards, null);
         String phase = randomAlphaOfLengthBetween(1, 10);
         StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
             randomAlphaOfLengthBetween(1, 10));

+ 31 - 12
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/ShrinkStepTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.indices.shrink.ResizeResponse;
 import org.elasticsearch.cluster.metadata.AliasMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.xpack.core.ilm.AsyncActionStep.Listener;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 import org.mockito.Mockito;
@@ -30,16 +31,23 @@ public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
     public ShrinkStep createRandomInstance() {
         StepKey stepKey = randomStepKey();
         StepKey nextStepKey = randomStepKey();
-        int numberOfShards = randomIntBetween(1, 20);
+        Integer numberOfShards = null;
+        ByteSizeValue maxSinglePrimarySize = null;
+        if (randomBoolean()) {
+            numberOfShards = randomIntBetween(1, 20);
+        } else {
+            maxSinglePrimarySize = new ByteSizeValue(between(1,100));
+        }
         String shrunkIndexPrefix = randomAlphaOfLength(10);
-        return new ShrinkStep(stepKey, nextStepKey, client, numberOfShards, shrunkIndexPrefix);
+        return new ShrinkStep(stepKey, nextStepKey, client, numberOfShards, maxSinglePrimarySize, shrunkIndexPrefix);
     }
 
     @Override
     public ShrinkStep mutateInstance(ShrinkStep instance) {
         StepKey key = instance.getKey();
         StepKey nextKey = instance.getNextStepKey();
-        int numberOfShards = instance.getNumberOfShards();
+        Integer numberOfShards = instance.getNumberOfShards();
+        ByteSizeValue maxSinglePrimarySize = instance.getMaxSinglePrimarySize();
         String shrunkIndexPrefix = instance.getShrunkIndexPrefix();
 
         switch (between(0, 3)) {
@@ -50,7 +58,12 @@ public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
             nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
             break;
         case 2:
-            numberOfShards = numberOfShards + 1;
+            if (numberOfShards != null) {
+                numberOfShards = numberOfShards + 1;
+            }
+            if (maxSinglePrimarySize != null) {
+                maxSinglePrimarySize = new ByteSizeValue(maxSinglePrimarySize.getBytes() + 1);
+            }
             break;
         case 3:
             shrunkIndexPrefix += randomAlphaOfLength(5);
@@ -59,13 +72,13 @@ public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
             throw new AssertionError("Illegal randomisation branch");
         }
 
-        return new ShrinkStep(key, nextKey, instance.getClient(), numberOfShards, shrunkIndexPrefix);
+        return new ShrinkStep(key, nextKey, instance.getClient(), numberOfShards, maxSinglePrimarySize, shrunkIndexPrefix);
     }
 
     @Override
     public ShrinkStep copyInstance(ShrinkStep instance) {
         return new ShrinkStep(instance.getKey(), instance.getNextStepKey(), instance.getClient(), instance.getNumberOfShards(),
-                instance.getShrunkIndexPrefix());
+            instance.getMaxSinglePrimarySize(), instance.getShrunkIndexPrefix());
     }
 
     public void testPerformAction() throws Exception {
@@ -91,14 +104,20 @@ public class ShrinkStepTests extends AbstractStepTestCase<ShrinkStep> {
             ActionListener<ResizeResponse> listener = (ActionListener<ResizeResponse>) invocation.getArguments()[1];
             assertThat(request.getSourceIndex(), equalTo(sourceIndexMetadata.getIndex().getName()));
             assertThat(request.getTargetIndexRequest().aliases(), equalTo(Collections.emptySet()));
-            assertThat(request.getTargetIndexRequest().settings(), equalTo(Settings.builder()
-                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, step.getNumberOfShards())
-                .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas())
+
+            Settings.Builder builder = Settings.builder();
+            builder.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, sourceIndexMetadata.getNumberOfReplicas())
                 .put(LifecycleSettings.LIFECYCLE_NAME, lifecycleName)
-                .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null)
-                .build()));
-            assertThat(request.getTargetIndexRequest().settings()
+                .put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", (String) null);
+            if (step.getNumberOfShards() != null) {
+                builder.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, step.getNumberOfShards());
+            }
+            assertThat(request.getTargetIndexRequest().settings(), equalTo(builder.build()));
+            if (step.getNumberOfShards() != null) {
+                assertThat(request.getTargetIndexRequest().settings()
                     .getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, -1), equalTo(step.getNumberOfShards()));
+            }
+            request.setMaxSinglePrimarySize(step.getMaxSinglePrimarySize());
             listener.onResponse(new ResizeResponse(true, true, sourceIndexMetadata.getIndex().getName()));
             return null;
         }).when(indicesClient).resizeIndex(Mockito.any(), Mockito.any());

+ 3 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java

@@ -52,7 +52,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
     private static final WaitForSnapshotAction TEST_WAIT_FOR_SNAPSHOT_ACTION = new WaitForSnapshotAction("policy");
     private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1, null);
     private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction(new ByteSizeValue(1), null, null);
-    private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(1);
+    private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(1, null);
     private static final ReadOnlyAction TEST_READ_ONLY_ACTION = new ReadOnlyAction();
     private static final FreezeAction TEST_FREEZE_ACTION = new FreezeAction();
     private static final SetPriorityAction TEST_PRIORITY_ACTION = new SetPriorityAction(0);
@@ -209,7 +209,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
 
     public void testValidateActionsFollowingSearchableSnapshot() {
         Phase hotPhase = new Phase("hot", TimeValue.ZERO, Map.of(SearchableSnapshotAction.NAME, new SearchableSnapshotAction("repo")));
-        Phase warmPhase = new Phase("warm", TimeValue.ZERO, Map.of(ShrinkAction.NAME, new ShrinkAction(1)));
+        Phase warmPhase = new Phase("warm", TimeValue.ZERO, Map.of(ShrinkAction.NAME, new ShrinkAction(1, null)));
         Phase coldPhase = new Phase("cold", TimeValue.ZERO, Map.of(FreezeAction.NAME, new FreezeAction()));
 
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
@@ -621,7 +621,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
             case RolloverAction.NAME:
                 return new RolloverAction(ByteSizeValue.parseBytesSizeValue("0b", "test"), TimeValue.ZERO, 1L);
             case ShrinkAction.NAME:
-                return new ShrinkAction(1);
+                return new ShrinkAction(1, null);
             case FreezeAction.NAME:
                 return new FreezeAction();
             case SetPriorityAction.NAME:

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/TimeSeriesRestDriver.java

@@ -173,7 +173,7 @@ public final class TimeSeriesRestDriver {
         warmActions.put(ForceMergeAction.NAME, new ForceMergeAction(1, null));
         warmActions.put(AllocateAction.NAME, new AllocateAction(1, singletonMap("_name", "javaRestTest-0,javaRestTest-1,javaRestTest-2," +
             "javaRestTest-3"), null, null));
-        warmActions.put(ShrinkAction.NAME, new ShrinkAction(1));
+        warmActions.put(ShrinkAction.NAME, new ShrinkAction(1, null));
         Map<String, LifecycleAction> coldActions = new HashMap<>();
         coldActions.put(SetPriorityAction.NAME, new SetPriorityAction(0));
         coldActions.put(AllocateAction.NAME, new AllocateAction(0, singletonMap("_name", "javaRestTest-3"), null, null));

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/ExplainLifecycleIT.java

@@ -71,7 +71,7 @@ public class ExplainLifecycleIT extends ESRestTestCase {
         {
             // Create a "shrink-only-policy"
             Map<String, LifecycleAction> warmActions = new HashMap<>();
-            warmActions.put(ShrinkAction.NAME, new ShrinkAction(17));
+            warmActions.put(ShrinkAction.NAME, new ShrinkAction(17, null));
             Map<String, Phase> phases = new HashMap<>();
             phases.put("warm", new Phase("warm", TimeValue.ZERO, warmActions));
             LifecyclePolicy lifecyclePolicy = new LifecyclePolicy("shrink-only-policy", phases);

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesDataStreamsIT.java

@@ -88,7 +88,7 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
 
     public void testShrinkActionInPolicyWithoutHotPhase() throws Exception {
         String policyName = "logs-policy";
-        createNewSingletonPolicy(client(), policyName, "warm", new ShrinkAction(1));
+        createNewSingletonPolicy(client(), policyName, "warm", new ShrinkAction(1, null));
 
         createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
 

+ 7 - 7
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

@@ -190,7 +190,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
         createIndexWithSettings(client(), index, alias, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
             .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0));
-        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(numShards + randomIntBetween(1, numShards)));
+        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(numShards + randomIntBetween(1, numShards), null));
         updatePolicy(index, policy);
         assertBusy(() -> {
             String failedStep = getFailedStepForIndex(index);
@@ -198,7 +198,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         }, 30, TimeUnit.SECONDS);
 
         // update policy to be correct
-        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards));
+        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards, null));
         updatePolicy(index, policy);
 
         // retry step
@@ -523,7 +523,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
         createIndexWithSettings(client(), index, alias, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
             .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0));
-        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards));
+        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(expectedFinalShards, null));
         updatePolicy(index, policy);
         assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
         assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
@@ -542,7 +542,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
         createIndexWithSettings(client(), index, alias, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
             .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0));
-        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(numberOfShards));
+        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(numberOfShards, null));
         updatePolicy(index, policy);
         assertBusy(() -> {
             assertTrue(indexExists(index));
@@ -572,7 +572,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
                 .endObject()));
         assertOK(client().performRequest(request));
         // create delete policy
-        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(1), TimeValue.timeValueMillis(0));
+        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(1, null), TimeValue.timeValueMillis(0));
         // create index without policy
         createIndexWithSettings(client(), index, alias, Settings.builder()
             .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
@@ -613,7 +613,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         // add a policy
         Map<String, LifecycleAction> hotActions = Map.of(
             RolloverAction.NAME, new RolloverAction(null, null, 1L),
-            ShrinkAction.NAME, new ShrinkAction(expectedFinalShards));
+            ShrinkAction.NAME, new ShrinkAction(expectedFinalShards, null));
         Map<String, Phase> phases = Map.of(
             "hot", new Phase("hot", TimeValue.ZERO, hotActions));
         LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, phases);
@@ -675,7 +675,7 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         // assign the policy that'll attempt to shrink the index (disabling the migrate action as it'll otherwise wait for
         // all shards to be active and we want that to happen as part of the shrink action)
         MigrateAction migrateAction = new MigrateAction(false);
-        ShrinkAction shrinkAction = new ShrinkAction(expectedFinalShards);
+        ShrinkAction shrinkAction = new ShrinkAction(expectedFinalShards, null);
         Phase phase = new Phase("warm", TimeValue.ZERO, Map.of(migrateAction.getWriteableName(), migrateAction,
             shrinkAction.getWriteableName(), shrinkAction));
         LifecyclePolicy lifecyclePolicy = new LifecyclePolicy(policy, singletonMap(phase.getName(), phase));

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeseriesMoveToStepIT.java

@@ -125,7 +125,7 @@ public class TimeseriesMoveToStepIT extends ESRestTestCase {
 
     public void testMoveToInjectedStep() throws Exception {
         String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
-        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(1), TimeValue.timeValueHours(12));
+        createNewSingletonPolicy(client(), policy, "warm", new ShrinkAction(1, null), TimeValue.timeValueHours(12));
 
         createIndexWithSettings(client(), index, alias, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
             .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)

+ 2 - 2
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java

@@ -254,7 +254,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
         createPolicy(client(), policy,
             new Phase("hot", TimeValue.ZERO, Map.of(SetPriorityAction.NAME, new SetPriorityAction(10))),
             new Phase("warm", TimeValue.ZERO,
-                Map.of(ShrinkAction.NAME, new ShrinkAction(1), ForceMergeAction.NAME, new ForceMergeAction(1, null))
+                Map.of(ShrinkAction.NAME, new ShrinkAction(1, null), ForceMergeAction.NAME, new ForceMergeAction(1, null))
             ),
             new Phase("cold", TimeValue.ZERO, Map.of(SearchableSnapshotAction.NAME, new SearchableSnapshotAction(snapshotRepo))),
             null
@@ -322,7 +322,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
         createPolicy(client(), policy,
             new Phase("hot", TimeValue.ZERO, Map.of()),
             new Phase("warm", TimeValue.ZERO,
-                Map.of(ShrinkAction.NAME, new ShrinkAction(1), ForceMergeAction.NAME, new ForceMergeAction(1, null))
+                Map.of(ShrinkAction.NAME, new ShrinkAction(1, null), ForceMergeAction.NAME, new ForceMergeAction(1, null))
             ),
             new Phase("cold", TimeValue.ZERO, Map.of(FreezeAction.NAME, new FreezeAction())),
             null

+ 1 - 1
x-pack/plugin/ilm/src/internalClusterTest/java/org/elasticsearch/xpack/ilm/ILMMultiNodeIT.java

@@ -75,7 +75,7 @@ public class ILMMultiNodeIT extends ESIntegTestCase {
 
         RolloverAction rolloverAction = new RolloverAction(null, null, 1L);
         Phase hotPhase = new Phase("hot", TimeValue.ZERO, Collections.singletonMap(rolloverAction.getWriteableName(), rolloverAction));
-        ShrinkAction shrinkAction = new ShrinkAction(1);
+        ShrinkAction shrinkAction = new ShrinkAction(1, null);
         Phase warmPhase = new Phase("warm", TimeValue.ZERO, Collections.singletonMap(shrinkAction.getWriteableName(), shrinkAction));
         Map<String, Phase> phases = new HashMap<>();
         phases.put(hotPhase.getName(), hotPhase);

+ 1 - 1
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java

@@ -206,7 +206,7 @@ public class IndexLifecycleServiceTests extends ESTestCase {
 
     public void testRequestedStopInShrinkActionButNotShrinkStep() {
         // test all the shrink action steps that ILM can be stopped during (basically all of them minus the actual shrink)
-        ShrinkAction action = new ShrinkAction(1);
+        ShrinkAction action = new ShrinkAction(1, null);
         action.toSteps(mock(Client.class), "warm", randomStepKey()).stream()
             .map(sk -> sk.getKey().getName())
             .filter(name -> name.equals(ShrinkStep.NAME) == false)

+ 2 - 2
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/PolicyStepsRegistryTests.java

@@ -323,7 +323,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
         Mockito.when(client.settings()).thenReturn(Settings.EMPTY);
         String policyName = randomAlphaOfLength(5);
         Map<String, LifecycleAction> actions = new HashMap<>();
-        actions.put("shrink", new ShrinkAction(1));
+        actions.put("shrink", new ShrinkAction(1, null));
         Map<String, Phase> phases = new HashMap<>();
         Phase warmPhase = new Phase("warm", TimeValue.ZERO, actions);
         PhaseExecutionInfo pei = new PhaseExecutionInfo(policyName, warmPhase, 1, randomNonNegativeLong());
@@ -332,7 +332,7 @@ public class PolicyStepsRegistryTests extends ESTestCase {
         LifecyclePolicy newPolicy = new LifecyclePolicy(policyName, phases);
         // Modify the policy
         actions = new HashMap<>();
-        actions.put("shrink", new ShrinkAction(2));
+        actions.put("shrink", new ShrinkAction(2, null));
         phases = new HashMap<>();
         phases.put("warm", new Phase("warm", TimeValue.ZERO, actions));
         LifecyclePolicy updatedPolicy = new LifecyclePolicy(policyName, phases);