Browse Source

Add Rollup ILM Action (#65633)

this commit introduces a new Rollup ILM Action that allows indices
to be rolled up according to a specific rollup config. The
action also allows for the new rolled up index to be associated with
a different policy than the original/source index.

Relates #42720.

Closes #48003.
Tal Levy 4 years ago
parent
commit
5811deb580
21 changed files with 878 additions and 40 deletions
  1. 56 0
      docs/reference/ilm/actions/ilm-rollup.asciidoc
  2. 19 7
      docs/reference/ilm/ilm-actions.asciidoc
  3. 9 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java
  4. 135 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupILMAction.java
  5. 71 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupStep.java
  6. 29 13
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java
  7. 75 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRollupIndexPolicyStep.java
  8. 4 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java
  9. 6 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java
  10. 91 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupILMActionTests.java
  11. 149 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupStepTests.java
  12. 17 3
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java
  13. 144 0
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRollupIndexPolicyStepTests.java
  14. 5 2
      x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/action/PutLifecycleRequestTests.java
  15. 4 0
      x-pack/plugin/ilm/build.gradle
  16. 2 0
      x-pack/plugin/ilm/qa/multi-node/build.gradle
  17. 42 0
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java
  18. 1 1
      x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java
  19. 12 4
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java
  20. 2 2
      x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java
  21. 5 2
      x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleMetadataTests.java

+ 56 - 0
docs/reference/ilm/actions/ilm-rollup.asciidoc

@@ -0,0 +1,56 @@
+[role="xpack"]
+[[ilm-rollup]]
+=== Rollup
+
+Phases allowed: hot, cold.
+
+Aggregates an index's time series data and stores the results in a new read-only
+index. For example, you can roll up hourly data into daily or weekly summaries.
+
+For more information about rollup, see the <<rollup-api, rollup action documentation>>
+
+The name of the rolled up index will be the original index name of the managed index prefixed
+with `rollup-`.
+
+[[ilm-rollup-options]]
+==== Rollup options
+`config`::
+(Required, integer)
+The rollup configuration, a more detailed description of the
+rollup configuration specification can be found <<rollup-api-request-body,here>>.
+
+`rollup_policy`::
+(Optional, string)
+The name of an <<index-lifecycle-management, {ilm}>> ({ilm-init}) policy to associate
+with the newly created rollup index.
+
+[[ilm-rollup-ex]]
+==== Example
+
+[source,console]
+--------------------------------------------------
+PUT _ilm/policy/my_policy
+{
+  "policy": {
+    "phases": {
+      "cold": {
+        "actions": {
+          "rollup" : {
+            "config": {
+              "groups": {
+                "date_histogram": {
+                  "field": "@timestamp",
+                  "calendar_interval": "1y"
+                }
+              },
+              "metrics": [
+                { "field": "temperature", "metrics": [ "avg" ] }
+              ]
+            }
+          }
+        }
+      }
+    }
+  }
+}
+--------------------------------------------------

+ 19 - 7
docs/reference/ilm/ilm-actions.asciidoc

@@ -5,7 +5,7 @@
 
 <<ilm-allocate,Allocate>>::
 Move shards to nodes with different performance characteristics
-and reduce the number of replicas. 
+and reduce the number of replicas.
 
 <<ilm-delete,Delete>>::
 Permanently remove the index.
@@ -22,10 +22,10 @@ Move the index shards to the <<data-tiers, data tier>> that corresponds
 to the current {ilm-init} phase.
 
 <<ilm-readonly,Read only>>::
-Block write operations to the index. 
-  
+Block write operations to the index.
+
 <<ilm-rollover,Rollover>>::
-Remove the index as the write index for the rollover alias and 
+Remove the index as the write index for the rollover alias and
 start indexing to a new index.
 
 <<ilm-searchable-snapshot, Searchable snapshot>>::
@@ -35,17 +35,25 @@ and mount it as a searchable snapshot.
 
 <<ilm-set-priority,Set priority>>::
 Lower the priority of an index as it moves through the lifecycle
-to ensure that hot indices are recovered first. 
+to ensure that hot indices are recovered first.
 
 <<ilm-shrink,Shrink>>::
 Reduce the number of primary shards by shrinking the index into a new index.
 
 <<ilm-unfollow,Unfollow>>::
 Convert a follower index to a regular index.
-Performed automatically before a rollover, shrink, or searchable snapshot action. 
+Performed automatically before a rollover, shrink, or searchable snapshot action.
 
 <<ilm-wait-for-snapshot,Wait for snapshot>>::
-Ensure that a snapshot exists before deleting the index. 
+Ensure that a snapshot exists before deleting the index.
+
+ifdef::permanently-unreleased-branch[]
+
+<<ilm-rollup,Rollup>>::
+Aggregates an index's time series data and stores the results in a new read-only
+index. For example, you can roll up hourly data into daily or weekly summaries.
+
+endif::[]
 
 include::actions/ilm-allocate.asciidoc[]
 include::actions/ilm-delete.asciidoc[]
@@ -59,3 +67,7 @@ include::actions/ilm-set-priority.asciidoc[]
 include::actions/ilm-shrink.asciidoc[]
 include::actions/ilm-unfollow.asciidoc[]
 include::actions/ilm-wait-for-snapshot.asciidoc[]
+
+ifdef::permanently-unreleased-branch[]
+include::actions/ilm-rollup.asciidoc[]
+endif::[]

+ 9 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java

@@ -59,6 +59,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleType;
 import org.elasticsearch.xpack.core.ilm.MigrateAction;
 import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
+import org.elasticsearch.xpack.core.ilm.RollupILMAction;
 import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
 import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
 import org.elasticsearch.xpack.core.ilm.ShrinkAction;
@@ -422,7 +423,7 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
 
     @Override
     public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
-        return Arrays.asList(
+        List<NamedWriteableRegistry.Entry> namedWriteables = new ArrayList<>(Arrays.asList(
             // graph
             new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.GRAPH, GraphFeatureSetUsage::new),
             // logstash
@@ -526,7 +527,13 @@ public class XPackClientPlugin extends Plugin implements ActionPlugin, NetworkPl
             // Data Tiers
             new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_TIERS, DataTiersFeatureSetUsage::new),
             new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.RUNTIME_FIELDS, RuntimeFieldsFeatureSetUsage::new)
-        );
+        ));
+
+        if (RollupV2.isEnabled()) {
+            namedWriteables.add(new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new));
+        }
+
+        return namedWriteables;
     }
 
     @Override

+ 135 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupILMAction.java

@@ -0,0 +1,135 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ilm;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ConstructingObjectParser;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.xpack.core.ilm.Step.StepKey;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A {@link LifecycleAction} which calls {@link org.elasticsearch.xpack.core.rollup.action.RollupAction} on an index
+ */
+public class RollupILMAction implements LifecycleAction {
+    public static final String NAME = "rollup";
+
+    private static final ParseField CONFIG_FIELD = new ParseField("config");
+    private static final ParseField POLICY_FIELD = new ParseField("rollup_policy");
+
+    @SuppressWarnings("unchecked")
+    private static final ConstructingObjectParser<RollupILMAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
+        a -> new RollupILMAction((RollupActionConfig) a[0], (String) a[1]));
+
+    private final RollupActionConfig config;
+    private final String rollupPolicy;
+
+    static {
+        PARSER.declareField(ConstructingObjectParser.constructorArg(),
+            (p, c) -> RollupActionConfig.fromXContent(p), CONFIG_FIELD, ObjectParser.ValueType.OBJECT);
+        PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), POLICY_FIELD);
+    }
+
+    public static RollupILMAction parse(XContentParser parser) {
+        return PARSER.apply(parser, null);
+    }
+
+    public RollupILMAction(RollupActionConfig config, @Nullable String rollupPolicy) {
+        this.config = config;
+        this.rollupPolicy = rollupPolicy;
+    }
+
+    public RollupILMAction(StreamInput in) throws IOException {
+        this(new RollupActionConfig(in), in.readOptionalString());
+    }
+
+    @Override
+    public String getWriteableName() {
+        return NAME;
+    }
+
+    RollupActionConfig config() {
+        return config;
+    }
+
+    String rollupPolicy() {
+        return rollupPolicy;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field(CONFIG_FIELD.getPreferredName(), config);
+        if (rollupPolicy != null) {
+            builder.field(POLICY_FIELD.getPreferredName(), rollupPolicy);
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        config.writeTo(out);
+        out.writeOptionalString(rollupPolicy);
+    }
+
+    @Override
+    public boolean isSafeAction() {
+        return false;
+    }
+
+    @Override
+    public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
+        StepKey checkNotWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
+        StepKey readOnlyKey = new StepKey(phase, NAME, ReadOnlyStep.NAME);
+        StepKey rollupKey = new StepKey(phase, NAME, NAME);
+        CheckNotDataStreamWriteIndexStep checkNotWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNotWriteIndex,
+            readOnlyKey);
+        ReadOnlyStep readOnlyStep = new ReadOnlyStep(readOnlyKey, rollupKey, client);
+        if (rollupPolicy == null) {
+            Step rollupStep = new RollupStep(rollupKey, nextStepKey, client, config);
+            return List.of(checkNotWriteIndexStep, readOnlyStep, rollupStep);
+        } else {
+            StepKey updateRollupIndexPolicyStepKey = new StepKey(phase, NAME, UpdateRollupIndexPolicyStep.NAME);
+            Step rollupStep = new RollupStep(rollupKey, updateRollupIndexPolicyStepKey, client, config);
+            Step updateRollupIndexPolicyStep = new UpdateRollupIndexPolicyStep(updateRollupIndexPolicyStepKey, nextStepKey,
+                client, rollupPolicy);
+            return List.of(checkNotWriteIndexStep, readOnlyStep, rollupStep, updateRollupIndexPolicyStep);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        RollupILMAction that = (RollupILMAction) o;
+
+        return Objects.equals(this.config, that.config)
+            && Objects.equals(this.rollupPolicy, that.rollupPolicy);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(config, rollupPolicy);
+    }
+
+    @Override
+    public String toString() {
+        return Strings.toString(this);
+    }
+}

+ 71 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/RollupStep.java

@@ -0,0 +1,71 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ilm;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateObserver;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
+import org.elasticsearch.xpack.core.rollup.action.RollupAction;
+
+import java.util.Objects;
+
+/**
+ * Rolls up index using a {@link RollupActionConfig}
+ */
+public class RollupStep extends AsyncActionStep {
+    public static final String NAME = "rollup";
+    public static final String ROLLUP_INDEX_NAME_PREFIX = "rollup-";
+
+    private final RollupActionConfig config;
+
+    public RollupStep(StepKey key, StepKey nextStepKey, Client client, RollupActionConfig config) {
+        super(key, nextStepKey, client);
+        this.config = config;
+    }
+
+    public static String getRollupIndexName(String index) {
+        return ROLLUP_INDEX_NAME_PREFIX + index;
+    }
+
+    @Override
+    public boolean isRetryable() {
+        return true;
+    }
+
+    @Override
+    public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
+        String originalIndex = indexMetadata.getIndex().getName();
+        RollupAction.Request request = new RollupAction.Request(originalIndex, getRollupIndexName(originalIndex), config);
+        // currently RollupAction always acknowledges action was complete when no exceptions are thrown.
+        getClient().execute(RollupAction.INSTANCE, request,
+            ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
+    }
+
+    public RollupActionConfig getConfig() {
+        return config;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), config);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        RollupStep other = (RollupStep) obj;
+        return super.equals(obj)
+            && Objects.equals(config, other.config);
+    }
+}

+ 29 - 13
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleType.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ilm;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.util.set.Sets;
+import org.elasticsearch.rollup.RollupV2;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,28 +41,43 @@ public class TimeseriesLifecycleType implements LifecycleType {
     static final String COLD_PHASE = "cold";
     static final String DELETE_PHASE = "delete";
     static final List<String> VALID_PHASES = Arrays.asList(HOT_PHASE, WARM_PHASE, COLD_PHASE, DELETE_PHASE);
-    static final List<String> ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
-        ReadOnlyAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
+    static final List<String> ORDERED_VALID_HOT_ACTIONS;
     static final List<String> ORDERED_VALID_WARM_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, ReadOnlyAction.NAME,
         AllocateAction.NAME, MigrateAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME);
-    static final List<String> ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
-        MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
+    static final List<String> ORDERED_VALID_COLD_ACTIONS;
     static final List<String> ORDERED_VALID_DELETE_ACTIONS = Arrays.asList(WaitForSnapshotAction.NAME, DeleteAction.NAME);
-    static final Set<String> VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
+    static final Set<String> VALID_HOT_ACTIONS;
     static final Set<String> VALID_WARM_ACTIONS = Sets.newHashSet(ORDERED_VALID_WARM_ACTIONS);
-    static final Set<String> VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
+    static final Set<String> VALID_COLD_ACTIONS;
     static final Set<String> VALID_DELETE_ACTIONS = Sets.newHashSet(ORDERED_VALID_DELETE_ACTIONS);
-    private static final Map<String, Set<String>> ALLOWED_ACTIONS = Map.of(
-        HOT_PHASE, VALID_HOT_ACTIONS,
-        WARM_PHASE, VALID_WARM_ACTIONS,
-        COLD_PHASE, VALID_COLD_ACTIONS,
-        DELETE_PHASE, VALID_DELETE_ACTIONS);
+    private static final Map<String, Set<String>> ALLOWED_ACTIONS;
 
     static final Set<String> HOT_ACTIONS_THAT_REQUIRE_ROLLOVER = Sets.newHashSet(ReadOnlyAction.NAME, ShrinkAction.NAME,
-        ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
+        ForceMergeAction.NAME, RollupILMAction.NAME, SearchableSnapshotAction.NAME);
     // a set of actions that cannot be defined (executed) after the managed index has been mounted as searchable snapshot
     static final Set<String> ACTIONS_CANNOT_FOLLOW_SEARCHABLE_SNAPSHOT = Sets.newHashSet(ShrinkAction.NAME, ForceMergeAction.NAME,
-        FreezeAction.NAME, SearchableSnapshotAction.NAME);
+        FreezeAction.NAME, SearchableSnapshotAction.NAME, RollupILMAction.NAME);
+
+    static {
+        if (RollupV2.isEnabled()) {
+            ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
+                ReadOnlyAction.NAME, RollupILMAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
+            ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
+                MigrateAction.NAME, FreezeAction.NAME, RollupILMAction.NAME, SearchableSnapshotAction.NAME);
+        } else {
+            ORDERED_VALID_HOT_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, RolloverAction.NAME,
+                ReadOnlyAction.NAME, ShrinkAction.NAME, ForceMergeAction.NAME, SearchableSnapshotAction.NAME);
+            ORDERED_VALID_COLD_ACTIONS = Arrays.asList(SetPriorityAction.NAME, UnfollowAction.NAME, AllocateAction.NAME,
+                MigrateAction.NAME, FreezeAction.NAME, SearchableSnapshotAction.NAME);
+        }
+        VALID_HOT_ACTIONS = Sets.newHashSet(ORDERED_VALID_HOT_ACTIONS);
+        VALID_COLD_ACTIONS = Sets.newHashSet(ORDERED_VALID_COLD_ACTIONS);
+        ALLOWED_ACTIONS = new HashMap<>();
+        ALLOWED_ACTIONS.put(HOT_PHASE, VALID_HOT_ACTIONS);
+        ALLOWED_ACTIONS.put(WARM_PHASE, VALID_WARM_ACTIONS);
+        ALLOWED_ACTIONS.put(COLD_PHASE, VALID_COLD_ACTIONS);
+        ALLOWED_ACTIONS.put(DELETE_PHASE, VALID_DELETE_ACTIONS);
+    }
 
     private TimeseriesLifecycleType() {
     }

+ 75 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/UpdateRollupIndexPolicyStep.java

@@ -0,0 +1,75 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ilm;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateObserver;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.settings.Settings;
+
+import java.util.Objects;
+
+/**
+ * Updates the lifecycle policy for the rollup index for the original/currently managed index
+ */
+public class UpdateRollupIndexPolicyStep extends AsyncActionStep {
+    public static final String NAME = "update-rollup-policy";
+
+    private final String rollupPolicy;
+
+    public UpdateRollupIndexPolicyStep(StepKey key, StepKey nextStepKey, Client client, String rollupPolicy) {
+        super(key, nextStepKey, client);
+        this.rollupPolicy = rollupPolicy;
+    }
+
+    @Override
+    public boolean isRetryable() {
+        return true;
+    }
+
+    public String getRollupPolicy() {
+        return rollupPolicy;
+    }
+
+    @Override
+    public void performAction(IndexMetadata indexMetadata, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
+        String rollupIndex = RollupStep.getRollupIndexName(indexMetadata.getIndex().getName());
+        Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, rollupPolicy).build();
+        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(rollupIndex)
+            .masterNodeTimeout(getMasterTimeout(currentState))
+            .settings(settings);
+        getClient().admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> {
+            if (response.isAcknowledged()) {
+                listener.onResponse(true);
+            } else {
+                listener.onFailure(new ElasticsearchException("settings update not acknowledged in step [" + getKey().toString() + "]"));
+            }
+        }, listener::onFailure));
+    }
+
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), rollupPolicy);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        UpdateRollupIndexPolicyStep other = (UpdateRollupIndexPolicyStep) obj;
+        return super.equals(obj) &&
+                Objects.equals(rollupPolicy, other.rollupPolicy);
+    }
+}

+ 4 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyMetadataTests.java

@@ -49,7 +49,8 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
-                new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new)
             ));
     }
 
@@ -72,7 +73,8 @@ public class LifecyclePolicyMetadataTests extends AbstractSerializingTestCase<Li
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(FreezeAction.NAME), FreezeAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SetPriorityAction.NAME), SetPriorityAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
-            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
+            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
+            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RollupILMAction.NAME), RollupILMAction::parse)
         ));
         return new NamedXContentRegistry(entries);
     }

+ 6 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/LifecyclePolicyTests.java

@@ -59,7 +59,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
-                new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new)
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new),
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new)
             ));
     }
 
@@ -82,7 +83,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
-                SearchableSnapshotAction::parse)
+                SearchableSnapshotAction::parse),
+            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RollupILMAction.NAME), RollupILMAction::parse)
         ));
         return new NamedXContentRegistry(entries);
     }
@@ -205,6 +207,8 @@ public class LifecyclePolicyTests extends AbstractSerializingTestCase<LifecycleP
                         return new SearchableSnapshotAction(randomAlphaOfLengthBetween(1, 10));
                     case MigrateAction.NAME:
                         return new MigrateAction(false);
+                    case RollupILMAction.NAME:
+                        return RollupILMActionTests.randomInstance();
                     default:
                         throw new IllegalArgumentException("invalid action [" + action + "]");
                 }};

+ 91 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupILMActionTests.java

@@ -0,0 +1,91 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ilm;
+
+import org.elasticsearch.common.io.stream.Writeable.Reader;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.test.EqualsHashCodeTestUtils;
+import org.elasticsearch.xpack.core.ilm.Step.StepKey;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfigTests;
+import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class RollupILMActionTests extends AbstractActionTestCase<RollupILMAction> {
+
+    static RollupILMAction randomInstance() {
+        return new RollupILMAction(RollupActionConfigTests.randomConfig(random()),
+                randomBoolean() ? randomAlphaOfLength(5) : null);
+    }
+
+    @Override
+    protected RollupILMAction doParseInstance(XContentParser parser) {
+        return RollupILMAction.parse(parser);
+    }
+
+    @Override
+    protected RollupILMAction createTestInstance() {
+        return randomInstance();
+    }
+
+    @Override
+    protected Reader<RollupILMAction> instanceReader() {
+        return RollupILMAction::new;
+    }
+
+    @Override
+    public boolean isSafeAction() {
+        return false;
+    }
+
+    @Override
+    public void testToSteps() {
+        RollupILMAction action = new RollupILMAction(RollupActionConfigTests.randomConfig(random()), null);
+        String phase = randomAlphaOfLengthBetween(1, 10);
+        StepKey nextStepKey = new StepKey(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10),
+            randomAlphaOfLengthBetween(1, 10));
+        List<Step> steps = action.toSteps(null, phase, nextStepKey);
+        assertNotNull(steps);
+        assertEquals(3, steps.size());
+        assertThat(steps.get(0).getKey().getName(), equalTo(CheckNotDataStreamWriteIndexStep.NAME));
+        assertThat(steps.get(0).getNextStepKey().getName(), equalTo(ReadOnlyStep.NAME));
+        assertThat(steps.get(1).getKey().getName(), equalTo(ReadOnlyStep.NAME));
+        assertThat(steps.get(1).getNextStepKey().getName(), equalTo(RollupStep.NAME));
+        assertThat(steps.get(2).getKey().getName(), equalTo(RollupStep.NAME));
+        assertThat(steps.get(2).getNextStepKey(), equalTo(nextStepKey));
+    }
+
+    public void testEqualsAndHashCode() {
+        EqualsHashCodeTestUtils.checkEqualsAndHashCode(createTestInstance(), this::copy, this::notCopy);
+    }
+
+    RollupILMAction copy(RollupILMAction rollupILMAction) {
+        return new RollupILMAction(rollupILMAction.config(), rollupILMAction.rollupPolicy());
+    }
+
+    RollupILMAction notCopy(RollupILMAction rollupILMAction) {
+        RollupActionConfig newConfig = rollupILMAction.config();
+        String newRollupPolicy = rollupILMAction.rollupPolicy();
+        switch (randomIntBetween(0, 1)) {
+            case 0:
+                List<MetricConfig> metricConfigs = new ArrayList<>(rollupILMAction.config().getMetricsConfig());
+                metricConfigs.add(new MetricConfig(randomAlphaOfLength(4), Collections.singletonList("max")));
+                newConfig = new RollupActionConfig(rollupILMAction.config().getGroupConfig(), metricConfigs);
+                break;
+            case 1:
+                newRollupPolicy = randomAlphaOfLength(3);
+                break;
+            default:
+                throw new IllegalStateException("unreachable branch");
+        }
+        return new RollupILMAction(newConfig, newRollupPolicy);
+    }
+}

+ 149 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/RollupStepTests.java

@@ -0,0 +1,149 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ilm;
+
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.xpack.core.ilm.Step.StepKey;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfigTests;
+import org.elasticsearch.xpack.core.rollup.action.RollupAction;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+import static org.elasticsearch.cluster.DataStreamTestHelper.createTimestampField;
+import static org.hamcrest.Matchers.equalTo;
+
+public class RollupStepTests extends AbstractStepTestCase<RollupStep> {
+
+    @Override
+    public RollupStep createRandomInstance() {
+        StepKey stepKey = randomStepKey();
+        StepKey nextStepKey = randomStepKey();
+        RollupActionConfig config = RollupActionConfigTests.randomConfig(random());
+        return new RollupStep(stepKey, nextStepKey, client, config);
+    }
+
+    @Override
+    public RollupStep mutateInstance(RollupStep instance) {
+        StepKey key = instance.getKey();
+        StepKey nextKey = instance.getNextStepKey();
+
+        switch (between(0, 1)) {
+            case 0:
+                key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
+                break;
+            case 1:
+                nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
+                break;
+            default:
+                throw new AssertionError("Illegal randomisation branch");
+        }
+
+        return new RollupStep(key, nextKey, instance.getClient(), instance.getConfig());
+    }
+
+    @Override
+    public RollupStep copyInstance(RollupStep instance) {
+        return new RollupStep(instance.getKey(), instance.getNextStepKey(), instance.getClient(), instance.getConfig());
+    }
+
+    private IndexMetadata getIndexMetadata(String index) {
+        return IndexMetadata.builder(index)
+            .settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+    }
+
+    private static void assertRollupActionRequest(RollupAction.Request request, String sourceIndex) {
+        assertNotNull(request);
+        assertThat(request.getSourceIndex(), equalTo(sourceIndex));
+    }
+
+    public void testPerformAction() {
+        String index = randomAlphaOfLength(5);
+        IndexMetadata indexMetadata = getIndexMetadata(index);
+
+        RollupStep step = createRandomInstance();
+
+        mockClientRollupCall(index);
+
+        SetOnce<Boolean> actionCompleted = new SetOnce<>();
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
+
+            @Override
+            public void onResponse(boolean complete) {
+                actionCompleted.set(complete);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError("Unexpected method call", e);
+            }
+        });
+
+        assertEquals(true, actionCompleted.get());
+    }
+
+    public void testPerformActionOnDataStream() {
+        String dataStreamName = "test-datastream";
+        String backingIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
+        IndexMetadata indexMetadata = IndexMetadata.builder(backingIndexName)
+            .settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+
+        RollupStep step = createRandomInstance();
+
+        mockClientRollupCall(backingIndexName);
+
+        SetOnce<Boolean> actionCompleted = new SetOnce<>();
+        ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+            .metadata(
+                Metadata.builder()
+                    .put(new DataStream(dataStreamName, createTimestampField("@timestamp"),
+                        List.of(indexMetadata.getIndex())))
+                    .put(indexMetadata, true)
+            )
+            .build();
+        step.performAction(indexMetadata, clusterState, null, new AsyncActionStep.Listener() {
+
+            @Override
+            public void onResponse(boolean complete) {
+                actionCompleted.set(complete);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError("Unexpected method call", e);
+            }
+        });
+
+        assertEquals(true, actionCompleted.get());
+    }
+
+    private void mockClientRollupCall(String sourceIndex) {
+        Mockito.doAnswer(invocation -> {
+            RollupAction.Request request = (RollupAction.Request) invocation.getArguments()[1];
+            @SuppressWarnings("unchecked")
+            ActionListener<RollupAction.Response> listener = (ActionListener<RollupAction.Response>) invocation.getArguments()[2];
+            assertRollupActionRequest(request, sourceIndex);
+            listener.onResponse(new RollupAction.Response(true));
+            return null;
+        }).when(client).execute(Mockito.any(), Mockito.any(), Mockito.any());
+    }
+}

+ 17 - 3
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/TimeseriesLifecycleTypeTests.java

@@ -7,7 +7,12 @@ package org.elasticsearch.xpack.core.ilm;
 
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
+import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig;
+import org.elasticsearch.xpack.core.rollup.RollupActionGroupConfig;
+import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -56,6 +61,9 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
     // keeping the migrate action disabled as otherwise it could conflict with the allocate action if both are randomly selected for the
     // same phase
     private static final MigrateAction TEST_MIGRATE_ACTION = new MigrateAction(false);
+    private static final RollupILMAction TEST_ROLLUP_ACTION =new RollupILMAction(new RollupActionConfig(
+        new RollupActionGroupConfig(new RollupActionDateHistogramGroupConfig.FixedInterval("field", DateHistogramInterval.DAY)),
+        Collections.singletonList(new MetricConfig("field", Collections.singletonList("max")))), null);
 
     public void testValidatePhases() {
         boolean invalid = randomBoolean();
@@ -194,9 +202,9 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
     }
 
     public void testActionsThatCannotFollowSearchableSnapshot() {
-        assertThat(ACTIONS_CANNOT_FOLLOW_SEARCHABLE_SNAPSHOT.size(), is(4));
+        assertThat(ACTIONS_CANNOT_FOLLOW_SEARCHABLE_SNAPSHOT.size(), is(5));
         assertThat(ACTIONS_CANNOT_FOLLOW_SEARCHABLE_SNAPSHOT, containsInAnyOrder(ShrinkAction.NAME, FreezeAction.NAME,
-            ForceMergeAction.NAME, SearchableSnapshotAction.NAME));
+            ForceMergeAction.NAME, RollupILMAction.NAME, SearchableSnapshotAction.NAME));
     }
 
     public void testValidateActionsFollowingSearchableSnapshot() {
@@ -206,7 +214,8 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
 
         IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
             () -> TimeseriesLifecycleType.validateActionsFollowingSearchableSnapshot(List.of(hotPhase, warmPhase, coldPhase)));
-        assertThat(e.getMessage(), is("phases [warm,cold] define one or more of [searchable_snapshot, forcemerge, freeze, shrink] actions" +
+        assertThat(e.getMessage(), is(
+            "phases [warm,cold] define one or more of [searchable_snapshot, forcemerge, freeze, shrink, rollup] actions" +
             " which are not allowed after a managed index is mounted as a searchable snapshot"));
     }
 
@@ -216,6 +225,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
             phaseMap.put(phaseName, new Phase(phaseName, TimeValue.ZERO, Collections.emptyMap()));
         }
 
+
         assertTrue(isSorted(TimeseriesLifecycleType.INSTANCE.getOrderedPhases(phaseMap), Phase::getName, VALID_PHASES));
     }
 
@@ -620,6 +630,8 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
                 return new UnfollowAction();
             case MigrateAction.NAME:
                 return new MigrateAction(true);
+            case RollupILMAction.NAME:
+                return TEST_ROLLUP_ACTION;
             }
             return new DeleteAction();
         }).collect(Collectors.toConcurrentMap(LifecycleAction::getWriteableName, Function.identity()));
@@ -693,6 +705,8 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase {
                 return TEST_SEARCHABLE_SNAPSHOT_ACTION;
             case MigrateAction.NAME:
                 return TEST_MIGRATE_ACTION;
+            case RollupILMAction.NAME:
+                return TEST_ROLLUP_ACTION;
             default:
                 throw new IllegalArgumentException("unsupported timeseries phase action [" + actionName + "]");
         }

+ 144 - 0
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/UpdateRollupIndexPolicyStepTests.java

@@ -0,0 +1,144 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.ilm;
+
+
+import org.apache.lucene.util.SetOnce;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.xpack.core.ilm.AsyncActionStep.Listener;
+import org.elasticsearch.xpack.core.ilm.Step.StepKey;
+import org.mockito.Mockito;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class UpdateRollupIndexPolicyStepTests extends AbstractStepMasterTimeoutTestCase<UpdateRollupIndexPolicyStep> {
+
+    @Override
+    public UpdateRollupIndexPolicyStep createRandomInstance() {
+        StepKey stepKey = randomStepKey();
+        StepKey nextStepKey = randomStepKey();
+        String rollupPolicy = randomAlphaOfLength(10);
+
+        return new UpdateRollupIndexPolicyStep(stepKey, nextStepKey, client, rollupPolicy);
+    }
+
+    @Override
+    public UpdateRollupIndexPolicyStep mutateInstance(UpdateRollupIndexPolicyStep instance) {
+        StepKey key = instance.getKey();
+        StepKey nextKey = instance.getNextStepKey();
+        String rollupPolicy = instance.getRollupPolicy();
+
+        switch (between(0, 2)) {
+        case 0:
+            key = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
+            break;
+        case 1:
+            nextKey = new StepKey(key.getPhase(), key.getAction(), key.getName() + randomAlphaOfLength(5));
+            break;
+        case 2:
+            rollupPolicy = randomAlphaOfLength(5);
+            break;
+        default:
+            throw new AssertionError("Illegal randomisation branch");
+        }
+
+        return new UpdateRollupIndexPolicyStep(key, nextKey, client, rollupPolicy);
+    }
+
+    @Override
+    public UpdateRollupIndexPolicyStep copyInstance(UpdateRollupIndexPolicyStep instance) {
+        return new UpdateRollupIndexPolicyStep(instance.getKey(), instance.getNextStepKey(), instance.getClient(),
+            instance.getRollupPolicy());
+    }
+
+    @Override
+    protected IndexMetadata getIndexMetadata() {
+        return IndexMetadata.builder(randomAlphaOfLength(10)).settings(settings(Version.CURRENT))
+            .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
+    }
+
+    public void testPerformAction() {
+        IndexMetadata indexMetadata = getIndexMetadata();
+        String rollupIndex = RollupStep.getRollupIndexName(indexMetadata.getIndex().getName());
+        UpdateRollupIndexPolicyStep step = createRandomInstance();
+        Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, step.getRollupPolicy()).build();
+
+        Mockito.doAnswer(invocation -> {
+            UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
+            @SuppressWarnings("unchecked")
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[1];
+            assertThat(request.settings(), equalTo(settings));
+            assertThat(request.indices(), equalTo(new String[] { rollupIndex }));
+            listener.onResponse(AcknowledgedResponse.TRUE);
+            return null;
+        }).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
+
+        SetOnce<Boolean> actionCompleted = new SetOnce<>();
+
+        step.performAction(indexMetadata, emptyClusterState(), null, new Listener() {
+
+            @Override
+            public void onResponse(boolean complete) {
+                actionCompleted.set(complete);
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                throw new AssertionError("Unexpected method call", e);
+            }
+        });
+
+        assertEquals(true, actionCompleted.get());
+
+        Mockito.verify(client, Mockito.only()).admin();
+        Mockito.verify(adminClient, Mockito.only()).indices();
+        Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
+    }
+
+    public void testPerformActionFailure() {
+        IndexMetadata indexMetadata = getIndexMetadata();
+        String rollupIndex = RollupStep.getRollupIndexName(indexMetadata.getIndex().getName());
+        Exception exception = new RuntimeException();
+        UpdateRollupIndexPolicyStep step = createRandomInstance();
+        Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, step.getRollupPolicy()).build();
+
+        Mockito.doAnswer(invocation -> {
+            UpdateSettingsRequest request = (UpdateSettingsRequest) invocation.getArguments()[0];
+            @SuppressWarnings("unchecked")
+            ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocation.getArguments()[1];
+            assertThat(request.settings(), equalTo(settings));
+            assertThat(request.indices(), equalTo(new String[] { rollupIndex }));
+            listener.onFailure(exception);
+            return null;
+        }).when(indicesClient).updateSettings(Mockito.any(), Mockito.any());
+
+        SetOnce<Boolean> exceptionThrown = new SetOnce<>();
+        step.performAction(indexMetadata, emptyClusterState(), null, new Listener() {
+
+            @Override
+            public void onResponse(boolean complete) {
+                throw new AssertionError("Unexpected method call");
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                assertSame(exception, e);
+                exceptionThrown.set(true);
+            }
+        });
+
+        assertEquals(true, exceptionThrown.get());
+
+        Mockito.verify(client, Mockito.only()).admin();
+        Mockito.verify(adminClient, Mockito.only()).indices();
+        Mockito.verify(indicesClient, Mockito.only()).updateSettings(Mockito.any(), Mockito.any());
+    }
+}

+ 5 - 2
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/action/PutLifecycleRequestTests.java

@@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleType;
 import org.elasticsearch.xpack.core.ilm.MigrateAction;
 import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
+import org.elasticsearch.xpack.core.ilm.RollupILMAction;
 import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
 import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
 import org.elasticsearch.xpack.core.ilm.ShrinkAction;
@@ -77,7 +78,8 @@ public class PutLifecycleRequestTests extends AbstractSerializingTestCase<Reques
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
-                new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new)
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new),
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new)
             ));
     }
 
@@ -100,7 +102,8 @@ public class PutLifecycleRequestTests extends AbstractSerializingTestCase<Reques
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
                 SearchableSnapshotAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
-            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse)
+            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
+            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RollupILMAction.NAME), RollupILMAction::parse)
         ));
         return new NamedXContentRegistry(entries);
     }

+ 4 - 0
x-pack/plugin/ilm/build.gradle

@@ -18,3 +18,7 @@ dependencies {
 }
 
 addQaCheckDependencies()
+
+test {
+  systemProperty 'es.rollup_v2_feature_flag_enabled', 'true'
+}

+ 2 - 0
x-pack/plugin/ilm/qa/multi-node/build.gradle

@@ -16,6 +16,7 @@ File repoDir = file("$buildDir/testclusters/repo")
 tasks.named("javaRestTest").configure {
   /* To support taking index snapshots, we have to set path.repo setting */
   systemProperty 'tests.path.repo', repoDir
+  systemProperty 'es.rollup_v2_feature_flag_enabled', 'true'
 }
 
 testClusters.all {
@@ -30,6 +31,7 @@ testClusters.all {
   setting 'indices.lifecycle.poll_interval', '1000ms'
   setting 'logger.org.elasticsearch.xpack.core.ilm', 'TRACE'
   setting 'logger.org.elasticsearch.xpack.ilm', 'TRACE'
+  systemProperty 'es.rollup_v2_feature_flag_enabled', 'true'
 }
 
 if (BuildParams.inFipsJvm){

+ 42 - 0
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

@@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.engine.EngineConfig;
+import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
 import org.elasticsearch.snapshots.SnapshotState;
 import org.elasticsearch.test.rest.ESRestTestCase;
 import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
@@ -41,6 +42,8 @@ import org.elasticsearch.xpack.core.ilm.Phase;
 import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
 import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
+import org.elasticsearch.xpack.core.ilm.RollupILMAction;
+import org.elasticsearch.xpack.core.ilm.RollupStep;
 import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
 import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
 import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep;
@@ -50,12 +53,17 @@ import org.elasticsearch.xpack.core.ilm.Step;
 import org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep;
 import org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep;
 import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
+import org.elasticsearch.xpack.core.rollup.RollupActionConfig;
+import org.elasticsearch.xpack.core.rollup.RollupActionDateHistogramGroupConfig;
+import org.elasticsearch.xpack.core.rollup.RollupActionGroupConfig;
+import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
 import org.hamcrest.Matchers;
 import org.junit.Before;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -1468,6 +1476,40 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
         }, 30, TimeUnit.SECONDS));
     }
 
+    public void testRollupIndex() throws Exception {
+        createIndexWithSettings(client(), index, alias, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0));
+        String rollupIndex = RollupStep.getRollupIndexName(index);
+        index(client(), index, "_id", "timestamp", "2020-01-01T05:10:00Z", "volume", 11.0);
+        RollupActionConfig rollupConfig = new RollupActionConfig(
+            new RollupActionGroupConfig(new RollupActionDateHistogramGroupConfig.FixedInterval("timestamp", DateHistogramInterval.DAY)),
+            Collections.singletonList(new MetricConfig("volume", Collections.singletonList("max"))));
+
+        createNewSingletonPolicy(client(), policy, "cold", new RollupILMAction(rollupConfig, null));
+        updatePolicy(index, policy);
+
+        assertBusy(() -> assertTrue(indexExists(rollupIndex)));
+        assertBusy(() -> assertFalse(getOnlyIndexSettings(client(), rollupIndex).containsKey(LifecycleSettings.LIFECYCLE_NAME)));
+        assertBusy(() -> assertTrue(indexExists(index)));
+    }
+
+    public void testRollupIndexAndSetNewRollupPolicy() throws Exception {
+        createIndexWithSettings(client(), index, alias, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0));
+        String rollupIndex = RollupStep.ROLLUP_INDEX_NAME_PREFIX + index;
+        index(client(), index, "_id", "timestamp", "2020-01-01T05:10:00Z", "volume", 11.0);
+        RollupActionConfig rollupConfig = new RollupActionConfig(
+            new RollupActionGroupConfig(new RollupActionDateHistogramGroupConfig.FixedInterval("timestamp", DateHistogramInterval.DAY)),
+            Collections.singletonList(new MetricConfig("volume", Collections.singletonList("max"))));
+
+        createNewSingletonPolicy(client(), policy, "cold", new RollupILMAction(rollupConfig, policy));
+        updatePolicy(index, policy);
+
+        assertBusy(() -> assertTrue(indexExists(rollupIndex)));
+        assertBusy(() -> assertThat(getOnlyIndexSettings(client(), rollupIndex).get(LifecycleSettings.LIFECYCLE_NAME), equalTo(policy)));
+        assertBusy(() -> assertTrue(indexExists(index)));
+    }
+
     // This method should be called inside an assertBusy, it has no retry logic of its own
     private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException {
         assertHistoryIsPresent(policyName, indexName, success, null, null, stepName);

+ 1 - 1
x-pack/plugin/ilm/qa/multi-node/src/javaRestTest/java/org/elasticsearch/xpack/ilm/actions/SearchableSnapshotActionIT.java

@@ -229,7 +229,7 @@ public class SearchableSnapshotActionIT extends ESRestTestCase {
             )
         );
 
-        assertThat(exception.getMessage(), is("phases [warm,cold] define one or more of [searchable_snapshot, forcemerge, freeze, shrink]" +
+        assertThat(exception.getMessage(), is("phases [warm,cold] define one or more of [searchable_snapshot, forcemerge, freeze, shrink, rollup]" +
             " actions which are not allowed after a managed index is mounted as a searchable snapshot"));
     }
 

+ 12 - 4
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java

@@ -33,6 +33,7 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.repositories.RepositoriesService;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.rollup.RollupV2;
 import org.elasticsearch.script.ScriptService;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.watcher.ResourceWatcherService;
@@ -49,6 +50,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleType;
 import org.elasticsearch.xpack.core.ilm.MigrateAction;
 import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
+import org.elasticsearch.xpack.core.ilm.RollupILMAction;
 import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
 import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
 import org.elasticsearch.xpack.core.ilm.ShrinkAction;
@@ -209,8 +211,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
     }
 
     @Override
-    public List<org.elasticsearch.common.xcontent.NamedXContentRegistry.Entry> getNamedXContent() {
-        return Arrays.asList(
+    public List<NamedXContentRegistry.Entry> getNamedXContent() {
+        List<NamedXContentRegistry.Entry> entries = new ArrayList<>(Arrays.asList(
             // Custom Metadata
             new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IndexLifecycleMetadata.TYPE),
                 parser -> IndexLifecycleMetadata.PARSER.parse(parser, null)),
@@ -234,8 +236,14 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
                 SearchableSnapshotAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME),
-                MigrateAction::parse)
-        );
+                MigrateAction::parse)));
+
+        if (RollupV2.isEnabled()) {
+            entries.add(new NamedXContentRegistry.Entry(LifecycleAction.class,
+                new ParseField(RollupILMAction.NAME), RollupILMAction::parse));
+        }
+
+        return entries;
     }
 
     @Override

+ 2 - 2
x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

@@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
 import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
 import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
 import org.elasticsearch.xpack.core.ilm.OperationMode;
+import org.elasticsearch.xpack.core.ilm.RollupStep;
 import org.elasticsearch.xpack.core.ilm.ShrinkStep;
 import org.elasticsearch.xpack.core.ilm.Step.StepKey;
 import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@@ -40,7 +41,6 @@ import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
 
 import java.io.Closeable;
 import java.time.Clock;
-import java.util.Collections;
 import java.util.Set;
 import java.util.function.LongSupplier;
 
@@ -53,7 +53,7 @@ import static org.elasticsearch.xpack.core.ilm.IndexLifecycleOriginationDatePars
 public class IndexLifecycleService
     implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener {
     private static final Logger logger = LogManager.getLogger(IndexLifecycleService.class);
-    private static final Set<String> IGNORE_STEPS_MAINTENANCE_REQUESTED = Collections.singleton(ShrinkStep.NAME);
+    private static final Set<String> IGNORE_STEPS_MAINTENANCE_REQUESTED = Set.of(ShrinkStep.NAME, RollupStep.NAME);
     private volatile boolean isMaster = false;
     private volatile TimeValue pollInterval;
 

+ 5 - 2
x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleMetadataTests.java

@@ -33,6 +33,7 @@ import org.elasticsearch.xpack.core.ilm.OperationMode;
 import org.elasticsearch.xpack.core.ilm.Phase;
 import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
 import org.elasticsearch.xpack.core.ilm.RolloverAction;
+import org.elasticsearch.xpack.core.ilm.RollupILMAction;
 import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
 import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
 import org.elasticsearch.xpack.core.ilm.ShrinkAction;
@@ -94,7 +95,8 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
                 new NamedWriteableRegistry.Entry(LifecycleAction.class, MigrateAction.NAME, MigrateAction::new),
-                new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new)
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, SearchableSnapshotAction.NAME, SearchableSnapshotAction::new),
+                new NamedWriteableRegistry.Entry(LifecycleAction.class, RollupILMAction.NAME, RollupILMAction::new)
             ));
     }
 
@@ -117,7 +119,8 @@ public class IndexLifecycleMetadataTests extends AbstractDiffableSerializationTe
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(UnfollowAction.NAME), UnfollowAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(MigrateAction.NAME), MigrateAction::parse),
             new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(SearchableSnapshotAction.NAME),
-                SearchableSnapshotAction::parse)
+                SearchableSnapshotAction::parse),
+            new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RollupILMAction.NAME), RollupILMAction::parse)
         ));
         return new NamedXContentRegistry(entries);
     }